Re: eager execution and debuggability

2018-05-10 Thread Ryan Blue
> it would be fantastic if we could make it easier to debug Spark programs
without needing to rely on eager execution.

I agree, it would be great if we could make the errors more clear about
where the error happened (user code or in Spark code) and what assumption
was violated. The problem is that this is a really hard thing to do
generally, like Reynold said. I think we should look for individual cases
where we can improve feedback so we can take a deeper look.

For example, we have an error case where users get a `NullPointerException`
in generated code. This was a huge pain to track down the first time, but
the problem is almost always that the user registered a UDF that returns an
object and Spark inferred that it would be non-null but the user's code
returns null. In these cases, we could add better error messages to
generated code, like "Column 'x = some_udf(y)' is required, but the value
was null". That would be really useful.

> I used to use an evaluate(dataframe) -> DataFrame function that simply
forces the materialization of a dataframe.

We have one of these, too. `display` that will run a dataframe and format
it for notebooks (html and text output). We also have a `materialize`
method that materializes a dataframe or RDD, like people use `count` for,
but that returns the materialized RDD so we can reuse it from the last
shuffle (we use this to avoid caching). It would be great if it were easier
to reuse the RDDs materialized by these calls, or even automatic. Right
now, if you run `show`, Spark doesn't know that a dataframe was
materialized and won't reuse the results unless you keep a reference to it.

We also have a problem where a dataframe used multiple times will cause
several table scans when the filters or projected columns change. That's
because each action optimizes the dataframe without knowing about the next.
I'd love to hear ideas on how to fix this.

On Wed, May 9, 2018 at 5:39 AM, Tim Hunter  wrote:

> The repr() trick is neat when working on a notebook. When working in a
> library, I used to use an evaluate(dataframe) -> DataFrame function that
> simply forces the materialization of a dataframe. As Reynold mentions, this
> is very convenient when working on a lot of chained UDFs, and it is a
> standard trick in lazy environments and languages.
>
> Tim
>
> On Wed, May 9, 2018 at 3:26 AM, Reynold Xin  wrote:
>
>> Yes would be great if possible but it’s non trivial (might be impossible
>> to do in general; we already have stacktraces that point to line numbers
>> when an error occur in UDFs but clearly that’s not sufficient). Also in
>> environments like REPL it’s still more useful to show error as soon as it
>> occurs, rather than showing it potentially 30 lines later.
>>
>> On Tue, May 8, 2018 at 7:22 PM Nicholas Chammas <
>> nicholas.cham...@gmail.com> wrote:
>>
>>> This may be technically impractical, but it would be fantastic if we
>>> could make it easier to debug Spark programs without needing to rely on
>>> eager execution. Sprinkling .count() and .checkpoint() at various
>>> points in my code is still a debugging technique I use, but it always makes
>>> me wish Spark could point more directly to the offending transformation
>>> when something goes wrong.
>>>
>>> Is it somehow possible to have each individual operator (is that the
>>> correct term?) in a DAG include metadata pointing back to the line of code
>>> that generated the operator? That way when an action triggers an error, the
>>> failing operation can point to the relevant line of code — even if it’s a
>>> transformation — and not just the action on the tail end that triggered the
>>> error.
>>>
>>> I don’t know how feasible this is, but addressing it would directly
>>> solve the issue of linking failures to the responsible transformation, as
>>> opposed to leaving the user to break up a chain of transformations with
>>> several debug actions. And this would benefit new and experienced users
>>> alike.
>>>
>>> Nick
>>>
>>> 2018년 5월 8일 (화) 오후 7:09, Ryan Blue rb...@netflix.com.invalid
>>> <http://mailto:rb...@netflix.com.invalid>님이 작성:
>>>
>>> I've opened SPARK-24215 to track this.
>>>>
>>>> On Tue, May 8, 2018 at 3:58 PM, Reynold Xin 
>>>> wrote:
>>>>
>>>>> Yup. Sounds great. This is something simple Spark can do and provide
>>>>> huge value to the end users.
>>>>>
>>>>>
>>>>> On Tue, May 8, 2018 at 3:53 PM Ryan Blue  wrote:
>>>>>
>>>>>> Would be great if it is somethin

Re: Time for 2.3.1?

2018-05-10 Thread Ryan Blue
Parquet has a Java patch release, 1.8.3, that should pass tomorrow morning.
I think the plan is to get that in to fix a bug with Parquet data written
by Impala.

On Thu, May 10, 2018 at 11:09 AM, Marcelo Vanzin 
wrote:

> Hello all,
>
> It's been a while since we shipped 2.3.0 and lots of important bug
> fixes have gone into the branch since then. I took a look at Jira and
> it seems there's not a lot of things explicitly targeted at 2.3.1 -
> the only potential blocker (a parquet issue) is being worked on since
> a new parquet with the fix was just released.
>
> So I'd like to propose to release 2.3.1 soon. If there are important
> fixes that should go into the release, please let those be known (by
> replying here or updating the bug in Jira), otherwise I'm volunteering
> to prepare the first RC soon-ish (around the weekend).
>
> Thanks!
>
>
> --
> Marcelo
>
> -----
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


-- 
Ryan Blue
Software Engineer
Netflix


Re: Time for 2.3.1?

2018-05-11 Thread Ryan Blue
The Parquet Java 1.8.3 release is out. Has anyone started a PR to update,
or should I?

On Fri, May 11, 2018 at 7:40 AM, Cody Koeninger  wrote:

> Sounds good, I'd like to add SPARK-24067 today assuming there's no
> objections
>
> On Thu, May 10, 2018 at 1:22 PM, Henry Robinson  wrote:
> > +1, I'd like to get a release out with SPARK-23852 fixed. The Parquet
> > community are about to release 1.8.3 - the voting period closes tomorrow
> -
> > and I've tested it with Spark 2.3 and confirmed the bug is fixed.
> Hopefully
> > it is released and I can post the version change to branch-2.3 before you
> > start to roll the RC this weekend.
> >
> > Henry
> >
> > On 10 May 2018 at 11:09, Marcelo Vanzin  wrote:
> >>
> >> Hello all,
> >>
> >> It's been a while since we shipped 2.3.0 and lots of important bug
> >> fixes have gone into the branch since then. I took a look at Jira and
> >> it seems there's not a lot of things explicitly targeted at 2.3.1 -
> >> the only potential blocker (a parquet issue) is being worked on since
> >> a new parquet with the fix was just released.
> >>
> >> So I'd like to propose to release 2.3.1 soon. If there are important
> >> fixes that should go into the release, please let those be known (by
> >> replying here or updating the bug in Jira), otherwise I'm volunteering
> >> to prepare the first RC soon-ish (around the weekend).
> >>
> >> Thanks!
> >>
> >>
> >> --
> >> Marcelo
> >>
> >> -
> >> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
> >>
> >
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


-- 
Ryan Blue
Software Engineer
Netflix


Re: eager execution and debuggability

2018-05-21 Thread Ryan Blue
+1 to job and stage info in the SQL visualization. This is one of the most
difficult places for both users and our data platform team to understand.

We've resorted to logging the plan that is compiled in
`WholeStageCodegenExec` so at least we can go from a stage to what the plan
was, but there's no context for some jobs. A good example is the range
estimation for an `order by`. The job runs and there's nothing that shows
what it did unless you stack trace tasks while it runs.

rb

On Mon, May 14, 2018 at 6:46 AM, Tomasz Gawęda 
wrote:

> Hi,
>
> >I agree, it would be great if we could make the errors more clear about
> where the error happened (user code or in Spark code) and what assumption
> was violated. The problem is that this is a really hard thing to do
> generally, like Reynold said. I think we should look for individual cases
> where we can improve feedback so we can take a deeper look.
>
>
> Huge +1 from my side. In current project we see that it's very hard to see
> why program failed, what was wrong and in which column. Sometimes it's
> easier to use UDFs, because at least you can wrote custom assertions in
> validation-sensitive functions. It's getting worse in WholeStageCodegen,
> where generated code does not look like the code we wrote. If you know
> Spark a bit more you have intuition where to search for errors, but I saw
> that my friends felt "lost" sometimes and needed my help and tips how to
> check where is error. More explicit assertions would make debugging much
> easier even for more experienced developers.
>
> For performance investigation, it would be great to have job info in SQL
> Visualization - i.e., possibility to "expand" nodes and see how many jobs
> was triggered and number of tasks and their duration. Now it's hard to
> debug it, especially for newbies.
>
> Pozdrawiam / Best regards,
> Tomek Gawęda
>
>
> On 2018-05-10 18:31, Ryan Blue wrote:
>
> > it would be fantastic if we could make it easier to debug Spark programs
> without needing to rely on eager execution.
>
> I agree, it would be great if we could make the errors more clear about
> where the error happened (user code or in Spark code) and what assumption
> was violated. The problem is that this is a really hard thing to do
> generally, like Reynold said. I think we should look for individual cases
> where we can improve feedback so we can take a deeper look.
>
> For example, we have an error case where users get a
> `NullPointerException` in generated code. This was a huge pain to track
> down the first time, but the problem is almost always that the user
> registered a UDF that returns an object and Spark inferred that it would be
> non-null but the user's code returns null. In these cases, we could add
> better error messages to generated code, like "Column 'x = some_udf(y)' is
> required, but the value was null". That would be really useful.
>
> > I used to use an evaluate(dataframe) -> DataFrame function that simply
> forces the materialization of a dataframe.
>
> We have one of these, too. `display` that will run a dataframe and format
> it for notebooks (html and text output). We also have a `materialize`
> method that materializes a dataframe or RDD, like people use `count` for,
> but that returns the materialized RDD so we can reuse it from the last
> shuffle (we use this to avoid caching). It would be great if it were easier
> to reuse the RDDs materialized by these calls, or even automatic. Right
> now, if you run `show`, Spark doesn't know that a dataframe was
> materialized and won't reuse the results unless you keep a reference to it.
>
> We also have a problem where a dataframe used multiple times will cause
> several table scans when the filters or projected columns change. That's
> because each action optimizes the dataframe without knowing about the next.
> I'd love to hear ideas on how to fix this.
>
> On Wed, May 9, 2018 at 5:39 AM, Tim Hunter 
> wrote:
>
>> The repr() trick is neat when working on a notebook. When working in a
>> library, I used to use an evaluate(dataframe) -> DataFrame function that
>> simply forces the materialization of a dataframe. As Reynold mentions, this
>> is very convenient when working on a lot of chained UDFs, and it is a
>> standard trick in lazy environments and languages.
>>
>> Tim
>>
>> On Wed, May 9, 2018 at 3:26 AM, Reynold Xin  wrote:
>>
>>> Yes would be great if possible but it’s non trivial (might be impossible
>>> to do in general; we already have stacktraces that point to line numbers
>>> when an error occur in UDFs but clearly that’s not sufficient

Re: Very slow complex type column reads from parquet

2018-06-12 Thread Ryan Blue
Jakub,

You're right that Spark currently doesn't use the vectorized read path for
nested data, but I'm not sure that's the problem here. With 50k elements in
the f1 array, it could easily be that you're getting the significant
speed-up from not reading or materializing that column. The non-vectorized
path is slower, but it is more likely that the problem is the data if it is
that much slower.

I'd be happy to see vectorization for nested Parquet data move forward, but
I think you might want to get an idea of how much it will help before you
move forward with it. Can you use Impala to test whether vectorization
would help here?

rb



On Mon, Jun 11, 2018 at 6:16 AM, Jakub Wozniak 
wrote:

> Hello,
>
> We have stumbled upon a quite degraded performance when reading a complex
> (struct, array) type columns stored in Parquet.
> A Parquet file is of around 600MB (snappy) with ~400k rows with a field of
> a complex type { f1: array of ints, f2: array of ints } where f1 array
> length is 50k elements.
> There are also other fields like entity_id: long, timestamp: long.
>
> A simple query that selects rows using predicates entity_id = X and
> timestamp >= T1 and timestamp <= T2 plus ds.show() takes 17 minutes to
> execute.
> If we remove the complex type columns from the query it is executed in a
> sub-second time.
>
> Now when looking at the implementation of the Parquet datasource the
> Vectorized* classes are used only if the read types are primitives. In
> other case the code falls back to the parquet-mr default implementation.
> In the VectorizedParquetRecordReader there is a TODO to handle complex
> types that "should be efficient & easy with codegen".
>
> For our CERN Spark usage the current execution times are pretty much
> prohibitive as there is a lot of data stored as arrays / complex types…
> The file of 600 MB represents 1 day of measurements and our data
> scientists would like to process sometimes months or even years of those.
>
> Could you please let me know if there is anybody currently working on it
> or maybe you have it in a roadmap for the future?
> Or maybe you could give me some suggestions how to avoid / resolve this
> problem? I’m using Spark 2.2.1.
>
> Best regards,
> Jakub Wozniak
>
>
>
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


-- 
Ryan Blue
Software Engineer
Netflix


Re: Very slow complex type column reads from parquet

2018-06-18 Thread Ryan Blue
e - is Parquet reader reading & decoding the projection
> columns even if the predicate columns should filter the record out?
>
> Unfortunately we have to have those big columns in the query as people
> want to do analysis on them.
>
> We will continue to investigate…
>
> Cheers,
> Jakub
>
>
>
> On 12 Jun 2018, at 22:51, Ryan Blue  wrote:
>
> Jakub,
>
> You're right that Spark currently doesn't use the vectorized read path for
> nested data, but I'm not sure that's the problem here. With 50k elements in
> the f1 array, it could easily be that you're getting the significant
> speed-up from not reading or materializing that column. The non-vectorized
> path is slower, but it is more likely that the problem is the data if it is
> that much slower.
>
> I'd be happy to see vectorization for nested Parquet data move forward,
> but I think you might want to get an idea of how much it will help before
> you move forward with it. Can you use Impala to test whether vectorization
> would help here?
>
> rb
>
>
>
> On Mon, Jun 11, 2018 at 6:16 AM, Jakub Wozniak 
> wrote:
>
>> Hello,
>>
>> We have stumbled upon a quite degraded performance when reading a complex
>> (struct, array) type columns stored in Parquet.
>> A Parquet file is of around 600MB (snappy) with ~400k rows with a field
>> of a complex type { f1: array of ints, f2: array of ints } where f1 array
>> length is 50k elements.
>> There are also other fields like entity_id: long, timestamp: long.
>>
>> A simple query that selects rows using predicates entity_id = X and
>> timestamp >= T1 and timestamp <= T2 plus ds.show() takes 17 minutes to
>> execute.
>> If we remove the complex type columns from the query it is executed in a
>> sub-second time.
>>
>> Now when looking at the implementation of the Parquet datasource the
>> Vectorized* classes are used only if the read types are primitives. In
>> other case the code falls back to the parquet-mr default implementation.
>> In the VectorizedParquetRecordReader there is a TODO to handle complex
>> types that "should be efficient & easy with codegen".
>>
>> For our CERN Spark usage the current execution times are pretty much
>> prohibitive as there is a lot of data stored as arrays / complex types…
>> The file of 600 MB represents 1 day of measurements and our data
>> scientists would like to process sometimes months or even years of those.
>>
>> Could you please let me know if there is anybody currently working on it
>> or maybe you have it in a roadmap for the future?
>> Or maybe you could give me some suggestions how to avoid / resolve this
>> problem? I’m using Spark 2.2.1.
>>
>> Best regards,
>> Jakub Wozniak
>>
>>
>>
>>
>> -
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>
>>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>
>
>
>


-- 
Ryan Blue
Software Engineer
Netflix


Re: Time for 2.3.2?

2018-06-28 Thread Ryan Blue
+1

On Thu, Jun 28, 2018 at 9:34 AM Xiao Li  wrote:

> +1. Thanks, Saisai!
>
> The impact of SPARK-24495 is large. We should release Spark 2.3.2 ASAP.
>
> Thanks,
>
> Xiao
>
> 2018-06-27 23:28 GMT-07:00 Takeshi Yamamuro :
>
>> +1, I heard some Spark users have skipped v2.3.1 because of these bugs.
>>
>> On Thu, Jun 28, 2018 at 3:09 PM Xingbo Jiang 
>> wrote:
>>
>>> +1
>>>
>>> Wenchen Fan 于2018年6月28日 周四下午2:06写道:
>>>
>>>> Hi Saisai, that's great! please go ahead!
>>>>
>>>> On Thu, Jun 28, 2018 at 12:56 PM Saisai Shao 
>>>> wrote:
>>>>
>>>>> +1, like mentioned by Marcelo, these issues seems quite severe.
>>>>>
>>>>> I can work on the release if short of hands :).
>>>>>
>>>>> Thanks
>>>>> Jerry
>>>>>
>>>>>
>>>>> Marcelo Vanzin  于2018年6月28日周四 上午11:40写道:
>>>>>
>>>>>> +1. SPARK-24589 / SPARK-24552 are kinda nasty and we should get fixes
>>>>>> for those out.
>>>>>>
>>>>>> (Those are what delayed 2.2.2 and 2.1.3 for those watching...)
>>>>>>
>>>>>> On Wed, Jun 27, 2018 at 7:59 PM, Wenchen Fan 
>>>>>> wrote:
>>>>>> > Hi all,
>>>>>> >
>>>>>> > Spark 2.3.1 was released just a while ago, but unfortunately we
>>>>>> discovered
>>>>>> > and fixed some critical issues afterward.
>>>>>> >
>>>>>> > SPARK-24495: SortMergeJoin may produce wrong result.
>>>>>> > This is a serious correctness bug, and is easy to hit: have
>>>>>> duplicated join
>>>>>> > key from the left table, e.g. `WHERE t1.a = t2.b AND t1.a = t2.c`,
>>>>>> and the
>>>>>> > join is a sort merge join. This bug is only present in Spark 2.3.
>>>>>> >
>>>>>> > SPARK-24588: stream-stream join may produce wrong result
>>>>>> > This is a correctness bug in a new feature of Spark 2.3: the
>>>>>> stream-stream
>>>>>> > join. Users can hit this bug if one of the join side is partitioned
>>>>>> by a
>>>>>> > subset of the join keys.
>>>>>> >
>>>>>> > SPARK-24552: Task attempt numbers are reused when stages are retried
>>>>>> > This is a long-standing bug in the output committer that may
>>>>>> introduce data
>>>>>> > corruption.
>>>>>> >
>>>>>> > SPARK-24542: UDFXPath allow users to pass carefully crafted XML
>>>>>> to
>>>>>> > access arbitrary files
>>>>>> > This is a potential security issue if users build access control
>>>>>> module upon
>>>>>> > Spark.
>>>>>> >
>>>>>> > I think we need a Spark 2.3.2 to address these issues(especially the
>>>>>> > correctness bugs) ASAP. Any thoughts?
>>>>>> >
>>>>>> > Thanks,
>>>>>> > Wenchen
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Marcelo
>>>>>>
>>>>>> -
>>>>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>>>>>
>>>>>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>
>

-- 
Ryan Blue
Software Engineer
Netflix


Re: [DISCUSS] SPIP: Standardize SQL logical plans

2018-07-12 Thread Ryan Blue
Thanks! I'm all for calling a vote on the SPIP. If I understand the process
correctly, the intent is for a "shepherd" to do it. I'm happy to call a
vote, or feel free if you'd like to play that role.

Other comments:
* DeleteData API: I completely agree that we need to have a proposal for
it. I think the SQL side is easier because DELETE FROM is already a
statement. We just need to be able to identify v2 tables to use it. I'll
come up with something and send a proposal to the dev list.
* Table create/drop/alter/load API: I think we have agreement around the
proposed DataSourceV2 API, but we need to decide how the public API will
work and how this will fit in with ExternalCatalog (see the other thread
for discussion there). Do you think we need to get that entire SPIP
approved before we can start getting the API in? If so, what do you think
needs to be decided to get it ready?

Thanks!

rb

On Wed, Jul 11, 2018 at 8:24 PM Wenchen Fan  wrote:

> Hi Ryan,
>
> Great job on this! Shall we call a vote for the plan standardization SPIP?
> I think this is a good idea and we should do it.
>
> Notes:
> We definitely need new user-facing APIs to produce these new logical plans
> like DeleteData. But we need a design doc for these new APIs after the SPIP
> passed.
> We definitely need the data source to provide the ability to
> create/drop/alter/lookup tables, but that belongs to the other SPIP and
> should be voted separately.
>
> Thanks,
> Wenchen
>
> On Fri, Apr 20, 2018 at 5:01 AM Ryan Blue 
> wrote:
>
>> Hi everyone,
>>
>> A few weeks ago, I wrote up a proposal to standardize SQL logical plans
>> <https://docs.google.com/document/d/1gYm5Ji2Mge3QBdOliFV5gSPTKlX4q1DCBXIkiyMv62A/edit?ts=5ace0718#heading=h.m45webtwxf2d>
>>  and
>> a supporting design doc for data source catalog APIs
>> <https://docs.google.com/document/d/1zLFiA1VuaWeVxeTDXNg8bL6GP3BVoOZBkewFtEnjEoo/edit#heading=h.m45webtwxf2d>.
>> From the comments on those docs, it looks like we mostly have agreement
>> around standardizing plans and around the data source catalog API.
>>
>> We still need to work out details, like the transactional API extension,
>> but I'd like to get started implementing those proposals so we have
>> something working for the 2.4.0 release. I'm starting this thread because I
>> think we're about ready to vote on the proposal
>> <https://spark.apache.org/improvement-proposals.html#discussing-an-spip>
>> and I'd like to get any remaining discussion going or get anyone that
>> missed this to read through the docs.
>>
>> Thanks!
>>
>> rb
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>

-- 
Ryan Blue
Software Engineer
Netflix


[VOTE] SPIP: Standardize SQL logical plans

2018-07-17 Thread Ryan Blue
Hi everyone,

>From discussion on the proposal doc and the discussion thread, I think we
have consensus around the plan to standardize logical write operations for
DataSourceV2. I would like to call a vote on the proposal.

The proposal doc is here: SPIP: Standardize SQL logical plans
<https://docs.google.com/document/u/1/d/1gYm5Ji2Mge3QBdOliFV5gSPTKlX4q1DCBXIkiyMv62A/edit?ts=5ace0718&usp=gmail#heading=h.m45webtwxf2d>
.

This vote is for the plan in that doc. The related SPIP with APIs to
create/alter/drop tables will be a separate vote.

Please vote in the next 72 hours:

[+1]: Spark should adopt the SPIP
[-1]: Spark should not adopt the SPIP because . . .

Thanks for voting, everyone!

-- 
Ryan Blue


Re: [DISCUSS] SPIP: Standardize SQL logical plans

2018-07-17 Thread Ryan Blue
I just called a vote on this. I don't think we really need a shepherd if
there's enough interest for a vote to pass.

rb

On Tue, Jul 17, 2018 at 9:00 AM Cody Koeninger  wrote:

> According to
>
> http://spark.apache.org/improvement-proposals.html
>
> the shepherd should be a PMC member, not necessarily the person who
> proposed the SPIP
>
> On Tue, Jul 17, 2018 at 9:13 AM, Wenchen Fan  wrote:
> > I don't know an official answer, but conventionally people who propose
> the
> > SPIP would call the vote and "shepherd" the project. Other people can
> jump
> > in during the development. I'm interested in the new API and like to
> work on
> > it after the vote passes.
> >
> > Thanks,
> > Wenchen
> >
> > On Fri, Jul 13, 2018 at 7:25 AM Ryan Blue  wrote:
> >>
> >> Thanks! I'm all for calling a vote on the SPIP. If I understand the
> >> process correctly, the intent is for a "shepherd" to do it. I'm happy to
> >> call a vote, or feel free if you'd like to play that role.
> >>
> >> Other comments:
> >> * DeleteData API: I completely agree that we need to have a proposal for
> >> it. I think the SQL side is easier because DELETE FROM is already a
> >> statement. We just need to be able to identify v2 tables to use it. I'll
> >> come up with something and send a proposal to the dev list.
> >> * Table create/drop/alter/load API: I think we have agreement around the
> >> proposed DataSourceV2 API, but we need to decide how the public API will
> >> work and how this will fit in with ExternalCatalog (see the other
> thread for
> >> discussion there). Do you think we need to get that entire SPIP approved
> >> before we can start getting the API in? If so, what do you think needs
> to be
> >> decided to get it ready?
> >>
> >> Thanks!
> >>
> >> rb
> >>
> >> On Wed, Jul 11, 2018 at 8:24 PM Wenchen Fan 
> wrote:
> >>>
> >>> Hi Ryan,
> >>>
> >>> Great job on this! Shall we call a vote for the plan standardization
> >>> SPIP? I think this is a good idea and we should do it.
> >>>
> >>> Notes:
> >>> We definitely need new user-facing APIs to produce these new logical
> >>> plans like DeleteData. But we need a design doc for these new APIs
> after the
> >>> SPIP passed.
> >>> We definitely need the data source to provide the ability to
> >>> create/drop/alter/lookup tables, but that belongs to the other SPIP and
> >>> should be voted separately.
> >>>
> >>> Thanks,
> >>> Wenchen
> >>>
> >>> On Fri, Apr 20, 2018 at 5:01 AM Ryan Blue 
> >>> wrote:
> >>>>
> >>>> Hi everyone,
> >>>>
> >>>> A few weeks ago, I wrote up a proposal to standardize SQL logical
> plans
> >>>> and a supporting design doc for data source catalog APIs. From the
> comments
> >>>> on those docs, it looks like we mostly have agreement around
> standardizing
> >>>> plans and around the data source catalog API.
> >>>>
> >>>> We still need to work out details, like the transactional API
> extension,
> >>>> but I'd like to get started implementing those proposals so we have
> >>>> something working for the 2.4.0 release. I'm starting this thread
> because I
> >>>> think we're about ready to vote on the proposal and I'd like to get
> any
> >>>> remaining discussion going or get anyone that missed this to read
> through
> >>>> the docs.
> >>>>
> >>>> Thanks!
> >>>>
> >>>> rb
> >>>>
> >>>> --
> >>>> Ryan Blue
> >>>> Software Engineer
> >>>> Netflix
> >>
> >>
> >>
> >> --
> >> Ryan Blue
> >> Software Engineer
> >> Netflix
>


-- 
Ryan Blue
Software Engineer
Netflix


Re: [VOTE] SPIP: Standardize SQL logical plans

2018-07-17 Thread Ryan Blue
+1 (not binding)

On Tue, Jul 17, 2018 at 10:59 AM Ryan Blue  wrote:

> Hi everyone,
>
> From discussion on the proposal doc and the discussion thread, I think we
> have consensus around the plan to standardize logical write operations for
> DataSourceV2. I would like to call a vote on the proposal.
>
> The proposal doc is here: SPIP: Standardize SQL logical plans
> <https://docs.google.com/document/u/1/d/1gYm5Ji2Mge3QBdOliFV5gSPTKlX4q1DCBXIkiyMv62A/edit?ts=5ace0718&usp=gmail#heading=h.m45webtwxf2d>
> .
>
> This vote is for the plan in that doc. The related SPIP with APIs to
> create/alter/drop tables will be a separate vote.
>
> Please vote in the next 72 hours:
>
> [+1]: Spark should adopt the SPIP
> [-1]: Spark should not adopt the SPIP because . . .
>
> Thanks for voting, everyone!
>
> --
> Ryan Blue
>


-- 
Ryan Blue


Re: [VOTE] SPARK 2.3.2 (RC3)

2018-07-18 Thread Ryan Blue
gt;>>>>>> at:
>>>>>>> https://dist.apache.org/repos/dist/dev/spark/v2.3.2-rc3-bin/
>>>>>>>
>>>>>>> Signatures used for Spark RCs can be found in this file:
>>>>>>> https://dist.apache.org/repos/dist/dev/spark/KEYS
>>>>>>>
>>>>>>> The staging repository for this release can be found at:
>>>>>>>
>>>>>>> https://repository.apache.org/content/repositories/orgapachespark-1278/
>>>>>>>
>>>>>>> The documentation corresponding to this release can be found at:
>>>>>>> https://dist.apache.org/repos/dist/dev/spark/v2.3.2-rc3-docs/
>>>>>>>
>>>>>>> The list of bug fixes going into 2.3.2 can be found at the following
>>>>>>> URL:
>>>>>>> https://issues.apache.org/jira/projects/SPARK/versions/12343289
>>>>>>>
>>>>>>> Note. RC2 was cancelled because of one blocking issue SPARK-24781
>>>>>>> during release preparation.
>>>>>>>
>>>>>>> FAQ
>>>>>>>
>>>>>>> =
>>>>>>> How can I help test this release?
>>>>>>> =
>>>>>>>
>>>>>>> If you are a Spark user, you can help us test this release by taking
>>>>>>> an existing Spark workload and running on this release candidate,
>>>>>>> then
>>>>>>> reporting any regressions.
>>>>>>>
>>>>>>> If you're working in PySpark you can set up a virtual env and install
>>>>>>> the current RC and see if anything important breaks, in the
>>>>>>> Java/Scala
>>>>>>> you can add the staging repository to your projects resolvers and
>>>>>>> test
>>>>>>> with the RC (make sure to clean up the artifact cache before/after so
>>>>>>> you don't end up building with a out of date RC going forward).
>>>>>>>
>>>>>>> ===
>>>>>>> What should happen to JIRA tickets still targeting 2.3.2?
>>>>>>> ===
>>>>>>>
>>>>>>> The current list of open tickets targeted at 2.3.2 can be found at:
>>>>>>> https://issues.apache.org/jira/projects/SPARK and search for "Target
>>>>>>> Version/s" = 2.3.2
>>>>>>>
>>>>>>> Committers should look at those and triage. Extremely important bug
>>>>>>> fixes, documentation, and API tweaks that impact compatibility should
>>>>>>> be worked on immediately. Everything else please retarget to an
>>>>>>> appropriate release.
>>>>>>>
>>>>>>> ==
>>>>>>> But my bug isn't fixed?
>>>>>>> ==
>>>>>>>
>>>>>>> In order to make timely releases, we will typically not hold the
>>>>>>> release unless the bug in question is a regression from the previous
>>>>>>> release. That being said, if there is something which is a regression
>>>>>>> that has not been correctly targeted please ping me or a committer to
>>>>>>>
>>>>>> help target the issue.
>>>>>>>
>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> John Zhuge
>>>>>>>
>>>>>>

-- 
Ryan Blue
Software Engineer
Netflix


[RESULT] [VOTE] SPIP: Standardize SQL logical plans

2018-07-20 Thread Ryan Blue
This vote passes with 4 binding +1s and 9 community +1s.

Thanks for taking the time to vote, everyone!

Binding votes:
Wenchen Fan
Xiao Li
Reynold Xin
Felix Cheung

Non-binding votes:
Ryan Blue
John Zhuge
Takeshi Yamamuro
Marco Gaido
Russel Spitzer
Alessandro Solimando
Henry Robinson
Dongjoon Hyun
Bruce Robbins


On Wed, Jul 18, 2018 at 4:43 PM Felix Cheung 
wrote:

> +1
>
>
> --
> *From:* Bruce Robbins 
> *Sent:* Wednesday, July 18, 2018 3:02 PM
> *To:* Ryan Blue
> *Cc:* Spark Dev List
> *Subject:* Re: [VOTE] SPIP: Standardize SQL logical plans
>
> +1 (non-binding)
>
> On Tue, Jul 17, 2018 at 10:59 AM, Ryan Blue  wrote:
>
>> Hi everyone,
>>
>> From discussion on the proposal doc and the discussion thread, I think we
>> have consensus around the plan to standardize logical write operations for
>> DataSourceV2. I would like to call a vote on the proposal.
>>
>> The proposal doc is here: SPIP: Standardize SQL logical plans
>> <https://docs.google.com/document/u/1/d/1gYm5Ji2Mge3QBdOliFV5gSPTKlX4q1DCBXIkiyMv62A/edit?ts=5ace0718&usp=gmail#heading=h.m45webtwxf2d>
>> .
>>
>> This vote is for the plan in that doc. The related SPIP with APIs to
>> create/alter/drop tables will be a separate vote.
>>
>> Please vote in the next 72 hours:
>>
>> [+1]: Spark should adopt the SPIP
>> [-1]: Spark should not adopt the SPIP because . . .
>>
>> Thanks for voting, everyone!
>>
>> --
>> Ryan Blue
>>
>
>

-- 
Ryan Blue


[DISCUSS] Multiple catalog support

2018-07-23 Thread Ryan Blue
Lately, I’ve been working on implementing the new SQL logical plans. I’m
currently blocked working on the plans that require table metadata
operations. For example, CTAS will be implemented as a create table and a
write using DSv2 (and a drop table if anything goes wrong). That requires
something to expose the create and drop table actions: a table catalog.

Initially, I opened #21306 <https://github.com/apache/spark/pull/21306> to
get a table catalog from the data source, but that’s a bad idea because it
conflicts with future multi-catalog support. Sources are an implementation
of a read and write API that can be shared between catalogs. For example,
you could have prod and test HMS catalogs that both use the Parquet source.
The Parquet source shouldn’t determine whether a CTAS statement creates a
table in prod or test.

That means that CTAS and other plans for DataSourceV2 need a solution to
determine the catalog to use.
Proposal

I propose we add support for multiple catalogs now in support of the
DataSourceV2 work, to avoid hacky work-arounds.

First, I think we need to add catalog to TableIdentifier so tables are
identified by catalog.db.table, not just db.table. This would make it easy
to specify the intended catalog for SQL statements, like CREATE
cat.db.table AS ..., and in the DataFrame API:
df.write.saveAsTable("cat.db.table") or spark.table("cat.db.table").

Second, we will need an API for catalogs to implement. The SPIP on APIs for
Table Metadata
<https://docs.google.com/document/d/1zLFiA1VuaWeVxeTDXNg8bL6GP3BVoOZBkewFtEnjEoo/edit#>
already proposed the API for create/alter/drop table operations. The only
part that is missing is how to register catalogs instead of using
DataSourceV2 to instantiate them.

I think we should configure catalogs through Spark config properties, like
this:

spark.sql.catalog. = 
spark.sql.catalog.. = 

When a catalog is referenced by name, Spark would instantiate the specified
class using a no-arg constructor. The instance would then be configured by
passing a map of the remaining pairs in the spark.sql.catalog..*
namespace to a configure method with the namespace part removed and an
extra “name” parameter with the catalog name. This would support external
sources like JDBC, which have common options like driver or hostname and
port.
Backward-compatibility

The current spark.catalog / ExternalCatalog would be used when the catalog
element of a TableIdentifier is left blank. That would provide
backward-compatibility. We could optionally allow users to control the
default table catalog with a property.
Relationship between catalogs and data sources

In the proposed table catalog API, actions return a Table object that
exposes the DSv2 ReadSupport and WriteSupport traits. Table catalogs would
share data source implementations by returning Table instances that use the
correct data source. V2 sources would no longer need to be loaded by
reflection; the catalog would be loaded instead.

Tables created using format("source") or USING source in SQL specify the
data source implementation directly. This “format” should be passed to the
source as a table property. The existing ExternalCatalog will need to
implement the new TableCatalog API for v2 sources and would continue to use
the property to determine the table’s data source or format implementation.
Other table catalog implementations would be free to interpret the format
string as they choose or to use it to choose a data source implementation
as in the default catalog.

rb
​
-- 
Ryan Blue
Software Engineer
Netflix


[DISCUSS] SPIP: APIs for Table Metadata Operations

2018-07-24 Thread Ryan Blue
The recently adopted SPIP to standardize logical plans requires a way for
to plug in providers for table metadata operations, so that the new plans
can create and drop tables. I proposed an API to do this in a follow-up SPIP
on APIs for Table Metadata Operations
<https://docs.google.com/document/d/1zLFiA1VuaWeVxeTDXNg8bL6GP3BVoOZBkewFtEnjEoo/edit#>.
This thread is to discuss that proposal.

There are two main parts:

   - A public facing API for creating, altering, and dropping tables
   - An API for catalog implementations to provide the underlying table
   operations

The main need is for the plug-in API, but I included the public one because
there isn’t currently a friendly public API to create tables and I think it
helps to see how both would work together.

Here’s a sample of the proposed public API:

catalog.createTable("db.table")
.addColumn("id", LongType)
.addColumn("data", StringType, nullable=true)
.addColumn("ts", TimestampType)
.partitionBy(day($"ts"))
.config("prop", "val")
.commit()

And here’s a sample of the catalog plug-in API:

Table createTable(
TableIdentifier ident,
StructType schema,
List partitions,
Optional> sortOrder,
Map properties)

Note that this API passes both bucketing and column-based partitioning as
Expressions. This is a generalization that makes it possible for the table
to use the relationship between columns and partitions. In the example
above, data is partitioned by the day of the timestamp field. Because the
expression is passed to the table, the table can use predicates on the
timestamp to filter out partitions without an explicit partition predicate.
There’s more detail in the proposal on this.

The SPIP is for the APIs and does not cover how multiple catalogs would be
exposed. I started a separate discussion thread on how to access multiple
catalogs and maintain compatibility with Spark’s current behavior (how to
get the catalog instance in the above example).

Please use this thread to discuss the proposed APIs. Thanks, everyone!

rb
​
-- 
Ryan Blue
Software Engineer
Netflix


Re: [DISCUSS] Multiple catalog support

2018-07-25 Thread Ryan Blue
Quick update: I've updated my PR to add the table catalog API to implement
this proposal. Here's the PR: https://github.com/apache/spark/pull/21306

On Mon, Jul 23, 2018 at 5:01 PM Ryan Blue  wrote:

> Lately, I’ve been working on implementing the new SQL logical plans. I’m
> currently blocked working on the plans that require table metadata
> operations. For example, CTAS will be implemented as a create table and a
> write using DSv2 (and a drop table if anything goes wrong). That requires
> something to expose the create and drop table actions: a table catalog.
>
> Initially, I opened #21306 <https://github.com/apache/spark/pull/21306>
> to get a table catalog from the data source, but that’s a bad idea because
> it conflicts with future multi-catalog support. Sources are an
> implementation of a read and write API that can be shared between catalogs.
> For example, you could have prod and test HMS catalogs that both use the
> Parquet source. The Parquet source shouldn’t determine whether a CTAS
> statement creates a table in prod or test.
>
> That means that CTAS and other plans for DataSourceV2 need a solution to
> determine the catalog to use.
> Proposal
>
> I propose we add support for multiple catalogs now in support of the
> DataSourceV2 work, to avoid hacky work-arounds.
>
> First, I think we need to add catalog to TableIdentifier so tables are
> identified by catalog.db.table, not just db.table. This would make it
> easy to specify the intended catalog for SQL statements, like CREATE
> cat.db.table AS ..., and in the DataFrame API:
> df.write.saveAsTable("cat.db.table") or spark.table("cat.db.table").
>
> Second, we will need an API for catalogs to implement. The SPIP on APIs
> for Table Metadata
> <https://docs.google.com/document/d/1zLFiA1VuaWeVxeTDXNg8bL6GP3BVoOZBkewFtEnjEoo/edit#>
> already proposed the API for create/alter/drop table operations. The only
> part that is missing is how to register catalogs instead of using
> DataSourceV2 to instantiate them.
>
> I think we should configure catalogs through Spark config properties, like
> this:
>
> spark.sql.catalog. = 
> spark.sql.catalog.. = 
>
> When a catalog is referenced by name, Spark would instantiate the
> specified class using a no-arg constructor. The instance would then be
> configured by passing a map of the remaining pairs in the
> spark.sql.catalog..* namespace to a configure method with the
> namespace part removed and an extra “name” parameter with the catalog name.
> This would support external sources like JDBC, which have common options
> like driver or hostname and port.
> Backward-compatibility
>
> The current spark.catalog / ExternalCatalog would be used when the
> catalog element of a TableIdentifier is left blank. That would provide
> backward-compatibility. We could optionally allow users to control the
> default table catalog with a property.
> Relationship between catalogs and data sources
>
> In the proposed table catalog API, actions return a Table object that
> exposes the DSv2 ReadSupport and WriteSupport traits. Table catalogs
> would share data source implementations by returning Table instances that
> use the correct data source. V2 sources would no longer need to be loaded
> by reflection; the catalog would be loaded instead.
>
> Tables created using format("source") or USING source in SQL specify the
> data source implementation directly. This “format” should be passed to the
> source as a table property. The existing ExternalCatalog will need to
> implement the new TableCatalog API for v2 sources and would continue to
> use the property to determine the table’s data source or format
> implementation. Other table catalog implementations would be free to
> interpret the format string as they choose or to use it to choose a data
> source implementation as in the default catalog.
>
> rb
> ​
> --
> Ryan Blue
> Software Engineer
> Netflix
>


-- 
Ryan Blue
Software Engineer
Netflix


Re: [DISCUSS] SPIP: APIs for Table Metadata Operations

2018-07-26 Thread Ryan Blue
I don’t think that we want to block this work until we have a public and
stable Expression. Like our decision to expose InternalRow, I think that
while this option isn’t great, it at least allows us to move forward. We
can hopefully replace it later.

Also note that the use of Expression is in the plug-in API, not in the
public API. I think that it is easier to expect data source implementations
to handle some instability here. We already use Expression as an option for
push-down in DSv2 so there’s precedent for it. Plus, we need to be able to
pass more complex expressions between the sources and Spark for sorting and
clustering data when it’s written to DSv2 (SPARK-23889
<https://issues.apache.org/jira/browse/SPARK-23889>).

Simple expressions for bucketing and column-based partitions would almost
certainly be stable. We can probably find a trade-off solution to not use
Expression in the TableCatalog API, but we definitely need expressions for
SPARK-23889.

SortOrder would be easier to replace with a more strict class based on only
column data rather than expressions. For #21306
<https://github.com/apache/spark/pull/21306>, I just left it out entirely.
What if I just removed it from the proposal and we can add it later?
​

On Thu, Jul 26, 2018 at 4:32 PM Reynold Xin  wrote:

> Seems reasonable at high level. I don't think we can use Expression's and
> SortOrder's in public APIs though. Those are not meant to be public and can
> break easily across versions.
>
>
> On Tue, Jul 24, 2018 at 9:26 AM Ryan Blue 
> wrote:
>
>> The recently adopted SPIP to standardize logical plans requires a way for
>> to plug in providers for table metadata operations, so that the new plans
>> can create and drop tables. I proposed an API to do this in a follow-up SPIP
>> on APIs for Table Metadata Operations
>> <https://docs.google.com/document/d/1zLFiA1VuaWeVxeTDXNg8bL6GP3BVoOZBkewFtEnjEoo/edit#>.
>> This thread is to discuss that proposal.
>>
>> There are two main parts:
>>
>>- A public facing API for creating, altering, and dropping tables
>>- An API for catalog implementations to provide the underlying table
>>operations
>>
>> The main need is for the plug-in API, but I included the public one
>> because there isn’t currently a friendly public API to create tables and I
>> think it helps to see how both would work together.
>>
>> Here’s a sample of the proposed public API:
>>
>> catalog.createTable("db.table")
>> .addColumn("id", LongType)
>> .addColumn("data", StringType, nullable=true)
>> .addColumn("ts", TimestampType)
>> .partitionBy(day($"ts"))
>> .config("prop", "val")
>> .commit()
>>
>> And here’s a sample of the catalog plug-in API:
>>
>> Table createTable(
>> TableIdentifier ident,
>> StructType schema,
>> List partitions,
>> Optional> sortOrder,
>> Map properties)
>>
>> Note that this API passes both bucketing and column-based partitioning as
>> Expressions. This is a generalization that makes it possible for the table
>> to use the relationship between columns and partitions. In the example
>> above, data is partitioned by the day of the timestamp field. Because the
>> expression is passed to the table, the table can use predicates on the
>> timestamp to filter out partitions without an explicit partition predicate.
>> There’s more detail in the proposal on this.
>>
>> The SPIP is for the APIs and does not cover how multiple catalogs would
>> be exposed. I started a separate discussion thread on how to access
>> multiple catalogs and maintain compatibility with Spark’s current behavior
>> (how to get the catalog instance in the above example).
>>
>> Please use this thread to discuss the proposed APIs. Thanks, everyone!
>>
>> rb
>> ​
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>

-- 
Ryan Blue
Software Engineer
Netflix


Re: [DISCUSS] Multiple catalog support

2018-07-29 Thread Ryan Blue
Wenchen, what I'm suggesting is a bit of both of your proposals.

I think that USING should be optional like your first option. USING (or
format(...) in the DF side) should configure the source or implementation,
while the catalog should be part of the table identifier. They serve two
different purposes: configuring the storage within the catalog, and
choosing which catalog to pass create or other calls to. I think that's
pretty much what you suggest in #1. The USING syntax would continue to be
used to configure storage within a catalog.

(Side note: I don't think this needs to be tied to a particular
implementation. We currently use 'parquet' to tell the Spark catalog to use
the Parquet source, but another catalog could also use 'parquet' to store
data in Parquet format without using the Spark built-in source.)

The second option suggests separating the catalog API from data source. In
#21306 , I add the proposed
catalog API and a reflection-based loader like the v1 sources use (and v2
sources have used so far). I think that it makes much more sense to start
with a catalog and then get the data source for operations like CTAS. This
is compatible with the behavior from your point #1: the catalog chooses the
source implementation and USING is optional.

The reason why we considered an API to get a catalog from the source is
because we defined the source API first, but it doesn't make sense to get a
catalog from the data source. Catalogs can share data sources (e.g. prod
and test environments). Plus, it makes more sense to determine the catalog
and then have it return the source implementation because it may require a
specific one, like JDBC or Iceberg would. With standard logical plans we
always know the catalog when creating the plan: either the table identifier
includes an explicit one, or the default catalog is used.

In the PR I mentioned above, the catalog implementation's class is
determined by Spark config properties, so there's no need to use
ServiceLoader and we can use the same implementation class for multiple
catalogs with different configs (e.g. prod and test environments).

Your last point about path-based tables deserves some attention. But, we
also need to define the behavior of path-based tables. Part of what we want
to preserve is flexibility, like how you don't need to alter the schema in
JSON tables, you just write different data. For the path-based syntax, I
suggest looking up source first and using the source if there is one. If
not, then look up the catalog. That way existing tables work, but we can
migrate to catalogs with names that don't conflict.

rb


Re: [DISCUSS] Multiple catalog support

2018-07-31 Thread Ryan Blue
 direction, one problem is that, data source may not be
> a good name anymore, since a data source can provide catalog
> functionalities.
>
> Under the hood, I feel this proposal is very similar to my second
> proposal, except that a catalog implementation must provide a default data
> source/storage, and different rule for looking up tables.
>
>
> On Sun, Jul 29, 2018 at 11:43 PM Ryan Blue  wrote:
>
>> Wenchen, what I'm suggesting is a bit of both of your proposals.
>>
>> I think that USING should be optional like your first option. USING (or
>> format(...) in the DF side) should configure the source or implementation,
>> while the catalog should be part of the table identifier. They serve two
>> different purposes: configuring the storage within the catalog, and
>> choosing which catalog to pass create or other calls to. I think that's
>> pretty much what you suggest in #1. The USING syntax would continue to be
>> used to configure storage within a catalog.
>>
>> (Side note: I don't think this needs to be tied to a particular
>> implementation. We currently use 'parquet' to tell the Spark catalog to use
>> the Parquet source, but another catalog could also use 'parquet' to store
>> data in Parquet format without using the Spark built-in source.)
>>
>> The second option suggests separating the catalog API from data source. In
>>  #21306 <https://github.com/apache/spark/pull/21306>, I add the proposed
>> catalog API and a reflection-based loader like the v1 sources use (and v2
>> sources have used so far). I think that it makes much more sense to
>> start with a catalog and then get the data source for operations like CTAS.
>> This is compatible with the behavior from your point #1: the catalog
>> chooses the source implementation and USING is optional.
>>
>> The reason why we considered an API to get a catalog from the source is
>> because we defined the source API first, but it doesn't make sense to get a
>> catalog from the data source. Catalogs can share data sources (e.g. prod
>> and test environments). Plus, it makes more sense to determine the catalog
>> and then have it return the source implementation because it may require a
>> specific one, like JDBC or Iceberg would. With standard logical plans we
>> always know the catalog when creating the plan: either the table identifier
>> includes an explicit one, or the default catalog is used.
>>
>> In the PR I mentioned above, the catalog implementation's class is
>> determined by Spark config properties, so there's no need to use
>> ServiceLoader and we can use the same implementation class for multiple
>> catalogs with different configs (e.g. prod and test environments).
>>
>> Your last point about path-based tables deserves some attention. But, we
>> also need to define the behavior of path-based tables. Part of what we want
>> to preserve is flexibility, like how you don't need to alter the schema in
>> JSON tables, you just write different data. For the path-based syntax, I
>> suggest looking up source first and using the source if there is one. If
>> not, then look up the catalog. That way existing tables work, but we can
>> migrate to catalogs with names that don't conflict.
>>
>> rb
>>
>

-- 
Ryan Blue
Software Engineer
Netflix


Re: [DISCUSS] SPIP: APIs for Table Metadata Operations

2018-08-13 Thread Ryan Blue
Reynold, did you get a chance to look at my response about using
`Expression`? I think that it's okay since it is already exposed in the v2
data source API. Plus, I wouldn't want to block this on building a public
expression API that is more stable.

I think that's the only objection to this SPIP. Anyone else want to raise
an issue with the proposal, or is it about time to bring up a vote thread?

rb

On Thu, Jul 26, 2018 at 5:00 PM Ryan Blue  wrote:

> I don’t think that we want to block this work until we have a public and
> stable Expression. Like our decision to expose InternalRow, I think that
> while this option isn’t great, it at least allows us to move forward. We
> can hopefully replace it later.
>
> Also note that the use of Expression is in the plug-in API, not in the
> public API. I think that it is easier to expect data source implementations
> to handle some instability here. We already use Expression as an option
> for push-down in DSv2 so there’s precedent for it. Plus, we need to be able
> to pass more complex expressions between the sources and Spark for sorting
> and clustering data when it’s written to DSv2 (SPARK-23889
> <https://issues.apache.org/jira/browse/SPARK-23889>).
>
> Simple expressions for bucketing and column-based partitions would almost
> certainly be stable. We can probably find a trade-off solution to not use
> Expression in the TableCatalog API, but we definitely need expressions for
> SPARK-23889.
>
> SortOrder would be easier to replace with a more strict class based on
> only column data rather than expressions. For #21306
> <https://github.com/apache/spark/pull/21306>, I just left it out
> entirely. What if I just removed it from the proposal and we can add it
> later?
> ​
>
> On Thu, Jul 26, 2018 at 4:32 PM Reynold Xin  wrote:
>
>> Seems reasonable at high level. I don't think we can use Expression's and
>> SortOrder's in public APIs though. Those are not meant to be public and can
>> break easily across versions.
>>
>>
>> On Tue, Jul 24, 2018 at 9:26 AM Ryan Blue 
>> wrote:
>>
>>> The recently adopted SPIP to standardize logical plans requires a way
>>> for to plug in providers for table metadata operations, so that the new
>>> plans can create and drop tables. I proposed an API to do this in a
>>> follow-up SPIP on APIs for Table Metadata Operations
>>> <https://docs.google.com/document/d/1zLFiA1VuaWeVxeTDXNg8bL6GP3BVoOZBkewFtEnjEoo/edit#>.
>>> This thread is to discuss that proposal.
>>>
>>> There are two main parts:
>>>
>>>- A public facing API for creating, altering, and dropping tables
>>>- An API for catalog implementations to provide the underlying table
>>>operations
>>>
>>> The main need is for the plug-in API, but I included the public one
>>> because there isn’t currently a friendly public API to create tables and I
>>> think it helps to see how both would work together.
>>>
>>> Here’s a sample of the proposed public API:
>>>
>>> catalog.createTable("db.table")
>>> .addColumn("id", LongType)
>>> .addColumn("data", StringType, nullable=true)
>>> .addColumn("ts", TimestampType)
>>> .partitionBy(day($"ts"))
>>> .config("prop", "val")
>>> .commit()
>>>
>>> And here’s a sample of the catalog plug-in API:
>>>
>>> Table createTable(
>>> TableIdentifier ident,
>>> StructType schema,
>>> List partitions,
>>> Optional> sortOrder,
>>> Map properties)
>>>
>>> Note that this API passes both bucketing and column-based partitioning
>>> as Expressions. This is a generalization that makes it possible for the
>>> table to use the relationship between columns and partitions. In the
>>> example above, data is partitioned by the day of the timestamp field.
>>> Because the expression is passed to the table, the table can use predicates
>>> on the timestamp to filter out partitions without an explicit partition
>>> predicate. There’s more detail in the proposal on this.
>>>
>>> The SPIP is for the APIs and does not cover how multiple catalogs would
>>> be exposed. I started a separate discussion thread on how to access
>>> multiple catalogs and maintain compatibility with Spark’s current behavior
>>> (how to get the catalog instance in the above example).
>>>
>>> Please use this thread to discuss the proposed APIs. Thanks, everyone!
>>>
>>> rb
>>> ​
>>> --
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
>>>
>>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>


-- 
Ryan Blue
Software Engineer
Netflix


Re: [DISCUSS] SPIP: APIs for Table Metadata Operations

2018-08-15 Thread Ryan Blue
I agree that it would be great to have a stable public expression API that
corresponds to what is parsed, not the implementations. That would be
great, but I worry that it will get out of date, and a data source that
needs to support a new expression has to wait up to 6 months for a public
release with it.

Another, lesser problem is that this currently blocks CTAS (for creating
partitioned tables) and DeleteSupport (for deleting data by expression) in
the v2 API. Because the new logical plans depend on the CreateTable
<https://github.com/apache/spark/pull/21306> and DeleteSupport
<https://github.com/apache/spark/pull/21308> APIs that need expressions,
they will be delayed.

That's not a bad thing -- I'd much rather get this right -- but I'm
concerned about the rush to move over to v2. With the new plans, **there's
no need to use SaveMode that will introduce unpredictable behavior in v2**,
but others have suggested supporting SaveMode until the new plans are
finished. I think it is fine to support reads and append to new tables for
now, but not having an expression is making that pressure to compromise the
v2 API worse. It's also not clear how this support would be cleanly
removed. (See this thread for context
<https://github.com/apache/spark/pull/21123#issuecomment-412850705>)

I'll start working on an alternative expression API. I think for table
creation we just need a small one for now. We'll need to extend it for
DeleteSupport, though. And, we will need to remove the support in v2 for
pushing Expression; hopefully sooner than later.

rb

On Wed, Aug 15, 2018 at 10:34 AM Reynold Xin  wrote:

> Sorry I completely disagree with using Expression in critical public APIs
> that we expect a lot of developers to use. There's a huge difference
> between exposing InternalRow vs Expression. InternalRow is a relatively
> small surface (still quite large) that I can see ourselves within a version
> getting to a point to make it stable, while Expression is everything in
> Spark SQL, including all the internal implementations, referencing logical
> plans and physical plans (due to subqueries). They weren't designed as
> public APIs, and it is simply not feasible to make them public APIs without
> breaking things all the time. I can however see ourselves creating a
> smaller scope, parallel public expressions API, similar to what we did for
> dsv1.
>
> If we are depending on Expressions on the more common APIs in dsv2
> already, we should revisit that.
>
>
>
>
> On Mon, Aug 13, 2018 at 1:59 PM Ryan Blue  wrote:
>
>> Reynold, did you get a chance to look at my response about using
>> `Expression`? I think that it's okay since it is already exposed in the v2
>> data source API. Plus, I wouldn't want to block this on building a public
>> expression API that is more stable.
>>
>> I think that's the only objection to this SPIP. Anyone else want to raise
>> an issue with the proposal, or is it about time to bring up a vote thread?
>>
>> rb
>>
>> On Thu, Jul 26, 2018 at 5:00 PM Ryan Blue  wrote:
>>
>>> I don’t think that we want to block this work until we have a public and
>>> stable Expression. Like our decision to expose InternalRow, I think
>>> that while this option isn’t great, it at least allows us to move forward.
>>> We can hopefully replace it later.
>>>
>>> Also note that the use of Expression is in the plug-in API, not in the
>>> public API. I think that it is easier to expect data source implementations
>>> to handle some instability here. We already use Expression as an option
>>> for push-down in DSv2 so there’s precedent for it. Plus, we need to be able
>>> to pass more complex expressions between the sources and Spark for sorting
>>> and clustering data when it’s written to DSv2 (SPARK-23889
>>> <https://issues.apache.org/jira/browse/SPARK-23889>).
>>>
>>> Simple expressions for bucketing and column-based partitions would
>>> almost certainly be stable. We can probably find a trade-off solution to
>>> not use Expression in the TableCatalog API, but we definitely need
>>> expressions for SPARK-23889.
>>>
>>> SortOrder would be easier to replace with a more strict class based on
>>> only column data rather than expressions. For #21306
>>> <https://github.com/apache/spark/pull/21306>, I just left it out
>>> entirely. What if I just removed it from the proposal and we can add it
>>> later?
>>> ​
>>>
>>> On Thu, Jul 26, 2018 at 4:32 PM Reynold Xin  wrote:
>>>
>>>> Seems reasonable at high level. I don't think we can use Expression&#

Re: [DISCUSS] SPIP: APIs for Table Metadata Operations

2018-08-15 Thread Ryan Blue
I think I found a good solution to the problem of using Expression in the
TableCatalog API and in the DeleteSupport API.

For DeleteSupport, there is already a stable and public subset of
Expression named Filter that can be used to pass filters. The reason why
DeleteSupport would use Expression is to support more complex expressions
like to_date(ts) = '2018-08-15' that are translated to ts >=
15343164 AND ts < 15344028. But, this can be done in Spark
instead of the data sources so I think DeleteSupport should use Filter
instead. I updated the DeleteSupport PR #21308
 with these changes.

Also, I agree that the DataSourceV2 API should also not expose Expression,
so I opened SPARK-25127 to track removing SupportsPushDownCatalystFilter
.

For TableCatalog, I took a similar approach instead of introducing a
parallel Expression API. Instead, I created a PartitionTransform API (like
Filter) that communicates the transformation function, function parameters
like num buckets, and column references. I updated the TableCatalog PR
#21306  to use
PartitionTransform instead of Expression and I updated the text of the SPIP
doc

.

I also raised a concern about needing to wait for Spark to add support for
new expressions (now partition transforms). To get around this, I added an
apply transform that passes the name of a function and an input column.
That way, users can still pass transforms that Spark doesn’t know about by
name to data sources: apply("source_function", "colName").

Please have a look at the updated pull requests and SPIP doc and comment!

rb


Re: [DISCUSS] USING syntax for Datasource V2

2018-08-20 Thread Ryan Blue
Thanks for posting this discussion to the dev list, it would be great to
hear what everyone thinks about the idea that USING should be a
catalog-specific storage configuration.

Related to this, I’ve updated the catalog PR, #21306
<https://github.com/apache/spark/pull/21306>, to include an implementation
that translates from the v2 TableCatalog API
<https://github.com/apache/spark/pull/21306/files#diff-a9d913d11630b965ef5dd3d3a02ca452>
to the current catalog API. That shows how this would fit together with v1,
at least for the catalog part. This will enable all of the new query plans
to be written to the TableCatalog API, even if they end up using the
default v1 catalog.

On Mon, Aug 20, 2018 at 12:19 AM Hyukjin Kwon  wrote:

> Hi all,
>
> I have been trying to follow `USING` syntax support since that looks
> currently not supported whereas `format` API supports this. I have been
> trying to understand why and talked with Ryan.
>
> Ryan knows all the details and, He and I thought it's good to post here -
> I just started to look into this.
> Here is Ryan's response:
>
>
> >USING is currently used to select the underlying data source
> implementation directly. The string passed in USING or format in the DF
> API is used to resolve an implementation class.
>
> The existing catalog supports tables that specify their datasource
> implementation, but this will not be the case for all catalogs when Spark
> adds multiple catalog support. For example, a Cassandra catalog or a JDBC
> catalog that exposes tables in those systems will definitely not support
> users marking tables with the “parquet” data source. The catalog must have
> the ability to determine the data source implementation. That’s why I think
> it is valuable to think of the current ExternalCatalog as one that can
> track tables with any read/write implementation. Other catalogs can’t and
> won’t do that.
>
> > In the catalog v2 API <https://github.com/apache/spark/pull/21306> I’ve
> proposed, everything from CREATE TABLE is passed to the catalog. Then the
> catalog determines what source to use and returns a Table instance that
> uses some class for its ReadSupport and WriteSupport implementation. An
> ExternalCatalog exposed through that API would receive the USING or format 
> string
> as a table property and would return a Table that uses the correct
> ReadSupport, so tables stored in an ExternalCatalog will work as they do
> today.
>
> > I think other catalogs should be able to choose what to do with the
> USING string. An Iceberg <https://github.com/Netflix/iceberg> catalog
> might use this to determine the underlying file format, which could be
> parquet, orc, or avro. Or, a JDBC catalog might use it for the underlying
> table implementation in the DB. This would make the property more of a
> storage hint for the catalog, which is going to determine the read/write
> implementation anyway.
>
> > For cases where there is no catalog involved, the current plan is to use
> the reflection-based approach from v1 with the USING or format string. In
> v2, that should resolve a ReadSupportProvider, which is used to create a
> ReadSupport directly from options. I think this is a good approach for
> backward-compatibility, but it can’t provide the same features as a
> catalog-based table. Catalogs are how we have decided to build reliable
> behavior for CTAS and the other standard logical plans
> <https://docs.google.com/document/d/1gYm5Ji2Mge3QBdOliFV5gSPTKlX4q1DCBXIkiyMv62A/edit?ts=5a987801#heading=h.m45webtwxf2d>.
> CTAS is a create and then an insert, and a write implementation alone can’t
> provide that create operation.
>
> I was targeting the last case (where there is no catalog involved) in
> particular. I was thinking that approach is also good since `USING` syntax
> compatibility should be kept anyway - this should reduce migration cost as
> well. Was wondering about what you guys think about this.
> If you guys could think the last case should be supported anyway, I was
> thinking we could just orthogonally proceed. If you guys think other issues
> should be resolved first, I think we (at least I will) should take a look
> for the set of catalog APIs.
>
>

-- 
Ryan Blue
Software Engineer
Netflix


Re: [DISCUSS] USING syntax for Datasource V2

2018-08-21 Thread Ryan Blue
I don’t understand why a Cassandra Catalogue wouldn’t be able to store
metadata references for a parquet table just as a Hive Catalogue can store
references to a C* datastource.

Sorry for the confusion. I’m not talking about a catalog that stores its
information in Cassandra. I’m talking about a catalog that talks directly
to Cassandra to expose Cassandra tables. It wouldn’t make sense because
Cassandra doesn’t store data in Parquet files. The reason you’d want to
have a catalog like this is to maintain a single source of truth for that
metadata instead of using the canonical metadata for a table in the system
that manages that table, and other metadata in Spark’s metastore catalog.

You could certainly build a catalog implementation that stores its data in
Cassandra or JDBC and supports the same tables that Spark does today.
That’s just not what I’m talking about here.

On Mon, Aug 20, 2018 at 7:31 PM Russell Spitzer 
wrote:

> I'm not sure I follow what the discussion topic is here
>
> > For example, a Cassandra catalog or a JDBC catalog that exposes tables
> in those systems will definitely not support users marking tables with the
> “parquet” data source.
>
> I don't understand why a Cassandra Catalogue wouldn't be able to store
> metadata references for a parquet table just as a Hive Catalogue can store
> references to a C* datastource. We currently store HiveMetastore data in a
> C* table and this allows us to store tables with any underlying format even
> though the catalogues' implantation is written in C*.
>
> Is the idea here that a table can't have multiple underlying formats in a
> given catalogue? And the USING can then be used on read to force a
> particular format?
>
> > I think other catalogs should be able to choose what to do with the
> USING string
>
> This makes sense to me, but i'm not sure why any catalogue would want to
> ignore this?
>
> It would be helpful to me to have a few examples written out if that is
> possible with Old Implementation and New Implementation
>
> Thanks for your time,
> Russ
>
> On Mon, Aug 20, 2018 at 11:33 AM Ryan Blue 
> wrote:
>
>> Thanks for posting this discussion to the dev list, it would be great to
>> hear what everyone thinks about the idea that USING should be a
>> catalog-specific storage configuration.
>>
>> Related to this, I’ve updated the catalog PR, #21306
>> <https://github.com/apache/spark/pull/21306>, to include an
>> implementation that translates from the v2 TableCatalog API
>> <https://github.com/apache/spark/pull/21306/files#diff-a9d913d11630b965ef5dd3d3a02ca452>
>> to the current catalog API. That shows how this would fit together with v1,
>> at least for the catalog part. This will enable all of the new query plans
>> to be written to the TableCatalog API, even if they end up using the
>> default v1 catalog.
>>
>> On Mon, Aug 20, 2018 at 12:19 AM Hyukjin Kwon 
>> wrote:
>>
>>> Hi all,
>>>
>>> I have been trying to follow `USING` syntax support since that looks
>>> currently not supported whereas `format` API supports this. I have been
>>> trying to understand why and talked with Ryan.
>>>
>>> Ryan knows all the details and, He and I thought it's good to post here
>>> - I just started to look into this.
>>> Here is Ryan's response:
>>>
>>>
>>> >USING is currently used to select the underlying data source
>>> implementation directly. The string passed in USING or format in the DF
>>> API is used to resolve an implementation class.
>>>
>>> The existing catalog supports tables that specify their datasource
>>> implementation, but this will not be the case for all catalogs when Spark
>>> adds multiple catalog support. For example, a Cassandra catalog or a JDBC
>>> catalog that exposes tables in those systems will definitely not support
>>> users marking tables with the “parquet” data source. The catalog must have
>>> the ability to determine the data source implementation. That’s why I think
>>> it is valuable to think of the current ExternalCatalog as one that can
>>> track tables with any read/write implementation. Other catalogs can’t and
>>> won’t do that.
>>>
>>> > In the catalog v2 API <https://github.com/apache/spark/pull/21306> I’ve
>>> proposed, everything from CREATE TABLE is passed to the catalog. Then
>>> the catalog determines what source to use and returns a Table instance
>>> that uses some class for its ReadSupport and WriteSupport implementation.
>>> An ExternalCatalog exposed through that API would r

Re: data source api v2 refactoring

2018-08-31 Thread Ryan Blue
on executors
>>
>> A streaming micro-batch scan would look like the following:
>>
>> val provider = reflection[Format]("parquet")
>> val table = provider.createTable(options)
>> val stream = table.createStream(scanConfig)
>>
>> while(true) {
>>   val scan = streamingScan.createScan(startOffset)
>>   // run tasks on executors
>> }
>>
>>
>> Vs the current API, the above:
>>
>> 1. Creates an explicit Table abstraction, and an explicit Scan
>> abstraction.
>>
>> 2. Have an explicit Stream level and makes it clear pushdowns and options
>> are handled there, rather than at the individual scan (ReadSupport) level.
>> Data source implementations don't need to worry about pushdowns or options
>> changing mid-stream. For batch, those happen when the scan object is
>> created.
>>
>>
>>
>> This email is just a high level sketch. I've asked Wenchen to prototype
>> this, to see if it is actually feasible and the degree of hacks it removes,
>> or creates.
>>
>>
>>

-- 
Ryan Blue
Software Engineer
Netflix


Re: [discuss] replacing SPIP template with Heilmeier's Catechism?

2018-08-31 Thread Ryan Blue
+1

I think this is a great suggestion. I agree a bit with Sean, but I think it
is really about mapping these questions into some of the existing
structure. These are a great way to think about projects, but they're
general and it would help to rephrase them for a software project, like
Matei's comment on considering cost. Similarly, we might rephrase
objectives to be goals/non-goals and add something to highlight that we
expect absolutely no Jargon. A design sketch is needed to argue how long it
will take, what is new, and why it would be successful; adding these
questions will help people understand how to go from that design sketch to
an argument for that design. I think these will guide people to write
proposals that is persuasive and well-formed.

rb

On Fri, Aug 31, 2018 at 4:17 PM Jules Damji  wrote:

> +1
>
> One could argue that the litany of the questions are really a double-click
> on the essence: why, what, how. The three interrogatives ought to be the
> essence and distillation of any proposal or technical exposition.
>
> Cheers
> Jules
>
> Sent from my iPhone
> Pardon the dumb thumb typos :)
>
> On Aug 31, 2018, at 11:23 AM, Reynold Xin  wrote:
>
> I helped craft the current SPIP template
> <https://spark.apache.org/improvement-proposals.html> last year. I was
> recently (re-)introduced to the Heilmeier Catechism, a set of questions
> DARPA developed to evaluate proposals. The set of questions are:
>
> - What are you trying to do? Articulate your objectives using absolutely
> no jargon.
> - How is it done today, and what are the limits of current practice?
> - What is new in your approach and why do you think it will be successful?
> - Who cares? If you are successful, what difference will it make?
> - What are the risks?
> - How much will it cost?
> - How long will it take?
> - What are the mid-term and final “exams” to check for success?
>
> When I read the above list, it resonates really well because they are
> almost always the same set of questions I ask myself and others before I
> decide whether something is worth doing. In some ways, our SPIP template
> tries to capture some of these (e.g. target persona), but are not as
> explicit and well articulated.
>
> What do people think about replacing the current SPIP template with the
> above?
>
> At a high level, I think the Heilmeier's Catechism emphasizes less about
> the "how", and more the "why" and "what", which is what I'd argue SPIPs
> should be about. The hows should be left in design docs for larger projects.
>
>
>

-- 
Ryan Blue
Software Engineer
Netflix


Re: data source api v2 refactoring

2018-09-01 Thread Ryan Blue
Thanks for clarifying, Wenchen. I think that's what I expected.

As for the abstraction, here's the way that I think about it: there are two
important parts of a scan: the definition of what will be read, and task
sets that actually perform the read. In batch, there's one definition of
the scan and one task set so it makes sense that there's one scan object
that encapsulates both of these concepts. For streaming, we need to
separate the two into the definition of what will be read (the stream or
streaming read) and the task sets that are run (scans). That way, the
streaming read behaves like a factory for scans, producing scans that
handle the data either in micro-batches or using continuous tasks.

To address Jungtaek's question, I think that this does work with
continuous. In continuous mode, the query operators keep running and send
data to one another directly. The API still needs a streaming read layer
because it may still produce more than one continuous scan. That would
happen when the underlying source changes and Spark needs to reconfigure. I
think the example here is when partitioning in a Kafka topic changes and
Spark needs to re-map Kafka partitions to continuous tasks.

rb

On Fri, Aug 31, 2018 at 5:12 PM Wenchen Fan  wrote:

> Hi Ryan,
>
> Sorry I may use a wrong wording. The pushdown is done with ScanConfig,
> which is not table/stream/scan, but something between them. The table
> creates ScanConfigBuilder, and table creates stream/scan with ScanConfig.
> For streaming source, stream is the one to take care of the pushdown
> result. For batch source, it's the scan.
>
> It's a little tricky because stream is an abstraction for streaming source
> only. Better ideas are welcome!
>
> On Sat, Sep 1, 2018 at 7:26 AM Ryan Blue  wrote:
>
>> Thanks, Reynold!
>>
>> I think your API sketch looks great. I appreciate having the Table level
>> in the abstraction to plug into as well. I think this makes it clear what
>> everything does, particularly having the Stream level that represents a
>> configured (by ScanConfig) streaming read and can act as a factory for
>> individual batch scans or for continuous scans.
>>
>> Wenchen, I'm not sure what you mean by doing pushdown at the table level.
>> It seems to mean that pushdown is specific to a batch scan or streaming
>> read, which seems to be what you're saying as well. Wouldn't the pushdown
>> happen to create a ScanConfig, which is then used as Reynold suggests?
>> Looking forward to seeing this PR when you get it posted. Thanks for all of
>> your work on this!
>>
>> rb
>>
>> On Fri, Aug 31, 2018 at 3:52 PM Wenchen Fan 
>> wrote:
>>
>>> Thank Reynold for writing this and starting the discussion!
>>>
>>> Data source v2 was started with batch only, so we didn't pay much
>>> attention to the abstraction and just follow the v1 API. Now we are
>>> designing the streaming API and catalog integration, the abstraction
>>> becomes super important.
>>>
>>> I like this proposed abstraction and have successfully prototyped it to
>>> make sure it works.
>>>
>>> During prototyping, I have to work around the issue that the current
>>> streaming engine does query optimization/planning for each micro batch.
>>> With this abstraction, the operator pushdown is only applied once
>>> per-query. In my prototype, I do the physical planning up front to get the
>>> pushdown result, and
>>> add a logical linking node that wraps the resulting physical plan node
>>> for the data source, and then swap that logical linking node into the
>>> logical plan for each batch. In the future we should just let the streaming
>>> engine do query optimization/planning only once.
>>>
>>> About pushdown, I think we should do it at the table level. The table
>>> should create a new pushdow handler to apply operator pushdowm for each
>>> scan/stream, and create the scan/stream with the pushdown result. The
>>> rationale is, a table should have the same pushdown behavior regardless the
>>> scan node.
>>>
>>> Thanks,
>>> Wenchen
>>>
>>>
>>>
>>>
>>>
>>> On Fri, Aug 31, 2018 at 2:00 PM Reynold Xin  wrote:
>>>
>>>> I spent some time last week looking at the current data source v2 apis,
>>>> and I thought we should be a bit more buttoned up in terms of the
>>>> abstractions and the guarantees Spark provides. In particular, I feel we
>>>> need the following levels of "abstractions", to fit the use cases in Spark,
>>>&g

Fwd: data source api v2 refactoring

2018-09-04 Thread Ryan Blue
Latest from Wenchen in case it was dropped.

-- Forwarded message -
From: Wenchen Fan 
Date: Mon, Sep 3, 2018 at 6:16 AM
Subject: Re: data source api v2 refactoring
To: 
Cc: Ryan Blue , Reynold Xin , <
dev@spark.apache.org>


Hi Mridul,

I'm not sure what's going on, my email was CC'ed to the dev list.


Hi Ryan,

The logical and physical scan idea sounds good. To add more color
to Jungtaek's question, both micro-batch and continuous mode have
the logical and physical scan, but there is a difference: for micro-batch
mode, a physical scan outputs data for one epoch, but it's not true for
continuous mode.

I'm not sure if it's necessary to include streaming epoch in the API
abstraction, for features like metrics reporting.

On Sun, Sep 2, 2018 at 12:31 PM Mridul Muralidharan 
wrote:

>
> Is it only me or are all others getting Wenchen’s mails ? (Obviously Ryan
> did :-) )
> I did not see it in the mail thread I received or in archives ... [1]
> Wondering which othersenderswere getting dropped (if yes).
>
> Regards
> Mridul
>
> [1]
> http://apache-spark-developers-list.1001551.n3.nabble.com/data-source-api-v2-refactoring-td24848.html
>
>
> On Sat, Sep 1, 2018 at 8:58 PM Ryan Blue 
> wrote:
>
>> Thanks for clarifying, Wenchen. I think that's what I expected.
>>
>> As for the abstraction, here's the way that I think about it: there are
>> two important parts of a scan: the definition of what will be read, and
>> task sets that actually perform the read. In batch, there's one definition
>> of the scan and one task set so it makes sense that there's one scan object
>> that encapsulates both of these concepts. For streaming, we need to
>> separate the two into the definition of what will be read (the stream or
>> streaming read) and the task sets that are run (scans). That way, the
>> streaming read behaves like a factory for scans, producing scans that
>> handle the data either in micro-batches or using continuous tasks.
>>
>> To address Jungtaek's question, I think that this does work with
>> continuous. In continuous mode, the query operators keep running and send
>> data to one another directly. The API still needs a streaming read layer
>> because it may still produce more than one continuous scan. That would
>> happen when the underlying source changes and Spark needs to reconfigure. I
>> think the example here is when partitioning in a Kafka topic changes and
>> Spark needs to re-map Kafka partitions to continuous tasks.
>>
>> rb
>>
>> On Fri, Aug 31, 2018 at 5:12 PM Wenchen Fan 
>> wrote:
>>
>>> Hi Ryan,
>>>
>>> Sorry I may use a wrong wording. The pushdown is done with ScanConfig,
>>> which is not table/stream/scan, but something between them. The table
>>> creates ScanConfigBuilder, and table creates stream/scan with ScanConfig.
>>> For streaming source, stream is the one to take care of the pushdown
>>> result. For batch source, it's the scan.
>>>
>>> It's a little tricky because stream is an abstraction for streaming
>>> source only. Better ideas are welcome!
>>>
>>
>>> On Sat, Sep 1, 2018 at 7:26 AM Ryan Blue  wrote:
>>>
>>>> Thanks, Reynold!
>>>>
>>>> I think your API sketch looks great. I appreciate having the Table
>>>> level in the abstraction to plug into as well. I think this makes it clear
>>>> what everything does, particularly having the Stream level that represents
>>>> a configured (by ScanConfig) streaming read and can act as a factory for
>>>> individual batch scans or for continuous scans.
>>>>
>>>> Wenchen, I'm not sure what you mean by doing pushdown at the table
>>>> level. It seems to mean that pushdown is specific to a batch scan or
>>>> streaming read, which seems to be what you're saying as well. Wouldn't the
>>>> pushdown happen to create a ScanConfig, which is then used as Reynold
>>>> suggests? Looking forward to seeing this PR when you get it posted. Thanks
>>>> for all of your work on this!
>>>>
>>>> rb
>>>>
>>>> On Fri, Aug 31, 2018 at 3:52 PM Wenchen Fan 
>>>> wrote:
>>>>
>>>>> Thank Reynold for writing this and starting the discussion!
>>>>>
>>>>> Data source v2 was started with batch only, so we didn't pay much
>>>>> attention to the abstraction and just follow the v1 API. Now we are
>>>>> designing the streaming API and catalog i

Re: [DISCUSS] SPIP: APIs for Table Metadata Operations

2018-09-04 Thread Ryan Blue
Thanks for posting the summary. I'm strongly in favor of option 1.

I think that API footprint is fairly small, but worth it. Not only does it
make sources easier to implement by handling parsing, it also makes sources
more reliable because Spark handles validation the same way across sources.

A good example is making sure that the referenced columns exist in the
table, which should be done using the case sensitivity of the analyzer.
Spark would pass normalized column names that match the case of the
declared columns to ensure that there isn't a problem if Spark is case
insensitive but the source doesn't implement it. And the source wouldn't
have to know about Spark's case sensitivity settings at all.

On Tue, Sep 4, 2018 at 7:46 PM Reynold Xin  wrote:

> Ryan, Michael and I discussed this offline today. Some notes here:
>
> His use case is to support partitioning data by derived columns, rather
> than physical columns, because he didn't want his users to keep adding the
> "date" column when in reality they are purely derived from some timestamp
> column. We reached consensus on this is a great use case and something we
> should support.
>
> We are still debating how to do this at API level. Two options:
>
> *Option 1.* Create a smaller surfaced, parallel Expression library, and
> use that for specifying partition columns. The bare minimum class hierarchy
> would look like:
>
> trait Expression
>
> class NamedFunction(name: String, args: Seq[Expression]) extends Expression
>
> class Literal(value: Any) extends Expression
>
> class ColumnReference(name: String) extends Expression
>
> These classes don't define how the expressions are evaluated, and it'd be
> up to the data sources to interpret them. As an example, for a table
> partitioned by date(ts), Spark would pass the following to the underlying
> ds:
>
> NamedFunction("date", ColumnReference("timestamp") :: Nil)
>
>
> *Option 2.* Spark passes strings over to the data sources. For the above
> example, Spark simply passes "date(ts)" as a string over.
>
>
> The pros/cons of 1 vs 2 are basically the inverse of each other. Option 1
> creates more rigid structure, with extra complexity in API design. Option 2
> is less structured but more flexible. Option 1 gives Spark the opportunity
> to enforce column references are valid (but not the actual function names),
> whereas option 2 would be up to the data sources to validate.
>
>
>
> On Wed, Aug 15, 2018 at 2:27 PM Ryan Blue  wrote:
>
>> I think I found a good solution to the problem of using Expression in the
>> TableCatalog API and in the DeleteSupport API.
>>
>> For DeleteSupport, there is already a stable and public subset of
>> Expression named Filter that can be used to pass filters. The reason why
>> DeleteSupport would use Expression is to support more complex expressions
>> like to_date(ts) = '2018-08-15' that are translated to ts >=
>> 15343164 AND ts < 15344028. But, this can be done in
>> Spark instead of the data sources so I think DeleteSupport should use
>> Filter instead. I updated the DeleteSupport PR #21308
>> <https://github.com/apache/spark/pull/21308> with these changes.
>>
>> Also, I agree that the DataSourceV2 API should also not expose
>> Expression, so I opened SPARK-25127 to track removing
>> SupportsPushDownCatalystFilter
>> <https://issues.apache.org/jira/browse/SPARK-25127>.
>>
>> For TableCatalog, I took a similar approach instead of introducing a
>> parallel Expression API. Instead, I created a PartitionTransform API (like
>> Filter) that communicates the transformation function, function parameters
>> like num buckets, and column references. I updated the TableCatalog PR
>> #21306 <https://github.com/apache/spark/pull/21306> to use
>> PartitionTransform instead of Expression and I updated the text of the SPIP
>> doc
>> <https://docs.google.com/document/d/1zLFiA1VuaWeVxeTDXNg8bL6GP3BVoOZBkewFtEnjEoo/edit#heading=h.m45webtwxf2d>
>> .
>>
>> I also raised a concern about needing to wait for Spark to add support
>> for new expressions (now partition transforms). To get around this, I added
>> an apply transform that passes the name of a function and an input
>> column. That way, users can still pass transforms that Spark doesn’t know
>> about by name to data sources: apply("source_function", "colName").
>>
>> Please have a look at the updated pull requests and SPIP doc and comment!
>>
>> rb
>>
>

-- 
Ryan Blue
Software Engineer
Netflix


Re: time for Apache Spark 3.0?

2018-09-06 Thread Ryan Blue
work on
>>>>>> Spark 3.0 in 2018.
>>>>>> >>>
>>>>>> >>> 2. Spark’s versioning policy promises that Spark does not break
>>>>>> stable APIs in feature releases (e.g. 2.1, 2.2). API breaking changes are
>>>>>> sometimes a necessary evil, and can be done in major releases (e.g. 1.6 
>>>>>> to
>>>>>> 2.0, 2.x to 3.0).
>>>>>> >>>
>>>>>> >>> 3. That said, a major version isn’t necessarily the playground
>>>>>> for disruptive API changes to make it painful for users to update. The 
>>>>>> main
>>>>>> purpose of a major release is an opportunity to fix things that are 
>>>>>> broken
>>>>>> in the current API and remove certain deprecated APIs.
>>>>>> >>>
>>>>>> >>> 4. Spark as a project has a culture of evolving architecture and
>>>>>> developing major new features incrementally, so major releases are not 
>>>>>> the
>>>>>> only time for exciting new features. For example, the bulk of the work in
>>>>>> the move towards the DataFrame API was done in Spark 1.3, and Continuous
>>>>>> Processing was introduced in Spark 2.3. Both were feature releases rather
>>>>>> than major releases.
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> You can find more background in the thread discussing Spark 2.0:
>>>>>> http://apache-spark-developers-list.1001551.n3.nabble.com/A-proposal-for-Spark-2-0-td15122.html
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> The primary motivating factor IMO for a major version bump is to
>>>>>> support Scala 2.12, which requires minor API breaking changes to Spark’s
>>>>>> APIs. Similar to Spark 2.0, I think there are also opportunities for 
>>>>>> other
>>>>>> changes that we know have been biting us for a long time but can’t be
>>>>>> changed in feature releases (to be clear, I’m actually not sure they are
>>>>>> all good ideas, but I’m writing them down as candidates for 
>>>>>> consideration):
>>>>>> >>>
>>>>>> >>> 1. Support Scala 2.12.
>>>>>> >>>
>>>>>> >>> 2. Remove interfaces, configs, and modules (e.g. Bagel)
>>>>>> deprecated in Spark 2.x.
>>>>>> >>>
>>>>>> >>> 3. Shade all dependencies.
>>>>>> >>>
>>>>>> >>> 4. Change the reserved keywords in Spark SQL to be more ANSI-SQL
>>>>>> compliant, to prevent users from shooting themselves in the foot, e.g.
>>>>>> “SELECT 2 SECOND” -- is “SECOND” an interval unit or an alias? To make it
>>>>>> less painful for users to upgrade here, I’d suggest creating a flag for
>>>>>> backward compatibility mode.
>>>>>> >>>
>>>>>> >>> 5. Similar to 4, make our type coercion rule in DataFrame/SQL
>>>>>> more standard compliant, and have a flag for backward compatibility.
>>>>>> >>>
>>>>>> >>> 6. Miscellaneous other small changes documented in JIRA already
>>>>>> (e.g. “JavaPairRDD flatMapValues requires function returning Iterable, 
>>>>>> not
>>>>>> Iterator”, “Prevent column name duplication in temporary view”).
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> Now the reality of a major version bump is that the world often
>>>>>> thinks in terms of what exciting features are coming. I do think there 
>>>>>> are
>>>>>> a number of major changes happening already that can be part of the 3.0
>>>>>> release, if they make it in:
>>>>>> >>>
>>>>>> >>> 1. Scala 2.12 support (listing it twice)
>>>>>> >>> 2. Continuous Processing non-experimental
>>>>>> >>> 3. Kubernetes support non-experimental
>>>>>> >>> 4. A more flushed out version of data source API v2 (I don’t
>>>>>> think it is realistic to stabilize that in one release)
>>>>>> >>> 5. Hadoop 3.0 support
>>>>>> >>> 6. ...
>>>>>> >>>
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> Similar to the 2.0 discussion, this thread should focus on the
>>>>>> framework and whether it’d make sense to create Spark 3.0 as the next
>>>>>> release, rather than the individual feature requests. Those are important
>>>>>> but are best done in their own separate threads.
>>>>>> >>>
>>>>>> >>>
>>>>>> >>>
>>>>>> >>>
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Regards,
>>>> Vaquar Khan
>>>> +1 -224-436-0783
>>>> Greater Chicago
>>>>
>>>
>>>
>>>
>>> --
>>> Regards,
>>> Vaquar Khan
>>> +1 -224-436-0783
>>> Greater Chicago
>>>
>>
>>
>>
>> --
>> Regards,
>> Vaquar Khan
>> +1 -224-436-0783
>> Greater Chicago
>>
>

-- 
Ryan Blue
Software Engineer
Netflix


Re: time for Apache Spark 3.0?

2018-09-06 Thread Ryan Blue
I definitely support moving to 3.0 to remove deprecations and update
dependencies.

For the v2 work, we know that there will be a major API changes and
standardization of behavior from the new logical plans going into the next
release. I think it is a safe bet that this isn’t going to be completely
done for the next release, so it will still be experimental or unstable for
3.0. I also expect that there will be some things that we want to
deprecate. Ideally, that deprecation could happen before a major release so
we can remove it.

I don’t have a problem releasing 3.0 with an unstable v2 API or targeting
4.0 to remove behavior and APIs replaced by v2. But, I want to make sure we
consider it when deciding what the next release should be.

It is probably better to release 3.0 now because it isn’t clear when the v2
API will become stable. And if we choose to release 3.0 next, we should
*not* aim to stabilize v2 for that release. Not that we shouldn’t try to
make it stable as soon as possible, I just think that it is unlikely to
happen in time and we should not rush to claim it is stable.

rb

On Thu, Sep 6, 2018 at 9:31 AM Sean Owen  wrote:

> I think this doesn't necessarily mean 3.0 is coming soon (thoughts on
> timing? 6 months?) but simply next. Do you mean you'd prefer that change to
> happen before 3.x? if it's a significant change, seems reasonable for a
> major version bump rather than minor. Is the concern that tying it to 3.0
> means you have to take a major version update to get it?
>
> I generally support moving on to 3.x so we can also jettison a lot of
> older dependencies, code, fix some long standing issues, etc.
>
> (BTW Scala 2.12 support, mentioned in the OP, will go in for 2.4)
>
> On Thu, Sep 6, 2018 at 9:10 AM Ryan Blue 
> wrote:
>
>> My concern is that the v2 data source API is still evolving and not very
>> close to stable. I had hoped to have stabilized the API and behaviors for a
>> 3.0 release. But we could also wait on that for a 4.0 release, depending on
>> when we think that will be.
>>
>> Unless there is a pressing need to move to 3.0 for some other area, I
>> think it would be better for the v2 sources to have a 2.5 release.
>>
>> On Thu, Sep 6, 2018 at 8:59 AM Xiao Li  wrote:
>>
>>> Yesterday, the 2.4 branch was created. Based on the above discussion, I
>>> think we can bump the master branch to 3.0.0-SNAPSHOT. Any concern?
>>>
>>>

-- 
Ryan Blue
Software Engineer
Netflix


Re: time for Apache Spark 3.0?

2018-09-06 Thread Ryan Blue
It would be great to get more features out incrementally. For experimental
features, do we have more relaxed constraints?

On Thu, Sep 6, 2018 at 9:47 AM Reynold Xin  wrote:

> +1 on 3.0
>
> Dsv2 stable can still evolve in across major releases. DataFrame, Dataset,
> dsv1 and a lot of other major features all were developed throughout the
> 1.x and 2.x lines.
>
> I do want to explore ways for us to get dsv2 incremental changes out there
> more frequently, to get feedback. Maybe that means we apply additive
> changes to 2.4.x; maybe that means making another 2.5 release sooner. I
> will start a separate thread about it.
>
>
>
> On Thu, Sep 6, 2018 at 9:31 AM Sean Owen  wrote:
>
>> I think this doesn't necessarily mean 3.0 is coming soon (thoughts on
>> timing? 6 months?) but simply next. Do you mean you'd prefer that change to
>> happen before 3.x? if it's a significant change, seems reasonable for a
>> major version bump rather than minor. Is the concern that tying it to 3.0
>> means you have to take a major version update to get it?
>>
>> I generally support moving on to 3.x so we can also jettison a lot of
>> older dependencies, code, fix some long standing issues, etc.
>>
>> (BTW Scala 2.12 support, mentioned in the OP, will go in for 2.4)
>>
>> On Thu, Sep 6, 2018 at 9:10 AM Ryan Blue 
>> wrote:
>>
>>> My concern is that the v2 data source API is still evolving and not very
>>> close to stable. I had hoped to have stabilized the API and behaviors for a
>>> 3.0 release. But we could also wait on that for a 4.0 release, depending on
>>> when we think that will be.
>>>
>>> Unless there is a pressing need to move to 3.0 for some other area, I
>>> think it would be better for the v2 sources to have a 2.5 release.
>>>
>>> On Thu, Sep 6, 2018 at 8:59 AM Xiao Li  wrote:
>>>
>>>> Yesterday, the 2.4 branch was created. Based on the above discussion, I
>>>> think we can bump the master branch to 3.0.0-SNAPSHOT. Any concern?
>>>>
>>>>

-- 
Ryan Blue
Software Engineer
Netflix


Re: data source api v2 refactoring

2018-09-06 Thread Ryan Blue
Wenchen,

I'm not really sure what you're proposing here. What is a `LogicalWrite`?
Is it something that mirrors the read side in your PR?

I think that I agree that if we have a Write independent of the Table that
carries the commit and abort methods, then we can create it directly
without a WriteConfig. So I tentatively agree with what you propose,
assuming that I understand it correctly.

rb

On Tue, Sep 4, 2018 at 8:42 PM Wenchen Fan  wrote:

> I'm switching to my another Gmail account, let's see if it still gets
> dropped this time.
>
> Hi Ryan,
>
> I'm thinking about the write path and feel the abstraction should be the
> same.
>
> We still have logical and physical writing. And the table can create
> different logical writing based on how to write. e.g., append, delete,
> replaceWhere, etc.
>
> One thing I'm not sure about is the WriteConfig. With the WriteConfig, the
> API would look like
> trait Table {
>   WriteConfig newAppendWriteConfig();
>
>   WriteConfig newDeleteWriteConfig(deleteExprs);
>
>   LogicalWrite newLogicalWrite(writeConfig);
> }
>
> Without WriteConfig, the API looks like
> trait Table {
>   LogicalWrite newAppendWrite();
>
>   LogicalWrite newDeleteWrite(deleteExprs);
> }
>
>
> It looks to me that the API is simpler without WriteConfig, what do you
> think?
>
> Thanks,
> Wenchen
>
> On Wed, Sep 5, 2018 at 4:24 AM Ryan Blue 
> wrote:
>
>> Latest from Wenchen in case it was dropped.
>>
>> -- Forwarded message -
>> From: Wenchen Fan 
>> Date: Mon, Sep 3, 2018 at 6:16 AM
>> Subject: Re: data source api v2 refactoring
>> To: 
>> Cc: Ryan Blue , Reynold Xin , <
>> dev@spark.apache.org>
>>
>>
>> Hi Mridul,
>>
>> I'm not sure what's going on, my email was CC'ed to the dev list.
>>
>>
>> Hi Ryan,
>>
>> The logical and physical scan idea sounds good. To add more color
>> to Jungtaek's question, both micro-batch and continuous mode have
>> the logical and physical scan, but there is a difference: for micro-batch
>> mode, a physical scan outputs data for one epoch, but it's not true for
>> continuous mode.
>>
>> I'm not sure if it's necessary to include streaming epoch in the API
>> abstraction, for features like metrics reporting.
>>
>> On Sun, Sep 2, 2018 at 12:31 PM Mridul Muralidharan 
>> wrote:
>>
>>>
>>> Is it only me or are all others getting Wenchen’s mails ? (Obviously
>>> Ryan did :-) )
>>> I did not see it in the mail thread I received or in archives ... [1]
>>> Wondering which othersenderswere getting dropped (if yes).
>>>
>>> Regards
>>> Mridul
>>>
>>> [1]
>>> http://apache-spark-developers-list.1001551.n3.nabble.com/data-source-api-v2-refactoring-td24848.html
>>>
>>>
>>> On Sat, Sep 1, 2018 at 8:58 PM Ryan Blue 
>>> wrote:
>>>
>>>> Thanks for clarifying, Wenchen. I think that's what I expected.
>>>>
>>>> As for the abstraction, here's the way that I think about it: there are
>>>> two important parts of a scan: the definition of what will be read, and
>>>> task sets that actually perform the read. In batch, there's one definition
>>>> of the scan and one task set so it makes sense that there's one scan object
>>>> that encapsulates both of these concepts. For streaming, we need to
>>>> separate the two into the definition of what will be read (the stream or
>>>> streaming read) and the task sets that are run (scans). That way, the
>>>> streaming read behaves like a factory for scans, producing scans that
>>>> handle the data either in micro-batches or using continuous tasks.
>>>>
>>>> To address Jungtaek's question, I think that this does work with
>>>> continuous. In continuous mode, the query operators keep running and send
>>>> data to one another directly. The API still needs a streaming read layer
>>>> because it may still produce more than one continuous scan. That would
>>>> happen when the underlying source changes and Spark needs to reconfigure. I
>>>> think the example here is when partitioning in a Kafka topic changes and
>>>> Spark needs to re-map Kafka partitions to continuous tasks.
>>>>
>>>> rb
>>>>
>>>> On Fri, Aug 31, 2018 at 5:12 PM Wenchen Fan 
>>>> wrote:
>>>>
>>>>> Hi Ryan,
>>>>>
>

Re: time for Apache Spark 3.0?

2018-09-06 Thread Ryan Blue
I meant flexibility beyond the point releases. I think what Reynold was
suggesting was getting v2 code out more often than the point releases every
6 months. An Evolving API can change in point releases, but maybe we should
move v2 to Unstable so it can change more often? I don't really see another
way to get changes out more often.

On Thu, Sep 6, 2018 at 11:07 AM Mark Hamstra 
wrote:

> Yes, that is why we have these annotations in the code and the
> corresponding labels appearing in the API documentation:
> https://github.com/apache/spark/blob/master/common/tags/src/main/java/org/apache/spark/annotation/InterfaceStability.java
>
> As long as it is properly annotated, we can change or even eliminate an
> API method before the next major release. And frankly, we shouldn't be
> contemplating bringing in the DS v2 API (and, I'd argue, *any* new API)
> without such an annotation. There is just too much risk of not getting
> everything right before we see the results of the new API being more widely
> used, and too much cost in maintaining until the next major release
> something that we come to regret for us to create new API in a fully frozen
> state.
>
>
> On Thu, Sep 6, 2018 at 9:49 AM Ryan Blue 
> wrote:
>
>> It would be great to get more features out incrementally. For
>> experimental features, do we have more relaxed constraints?
>>
>> On Thu, Sep 6, 2018 at 9:47 AM Reynold Xin  wrote:
>>
>>> +1 on 3.0
>>>
>>> Dsv2 stable can still evolve in across major releases. DataFrame,
>>> Dataset, dsv1 and a lot of other major features all were developed
>>> throughout the 1.x and 2.x lines.
>>>
>>> I do want to explore ways for us to get dsv2 incremental changes out
>>> there more frequently, to get feedback. Maybe that means we apply additive
>>> changes to 2.4.x; maybe that means making another 2.5 release sooner. I
>>> will start a separate thread about it.
>>>
>>>
>>>
>>> On Thu, Sep 6, 2018 at 9:31 AM Sean Owen  wrote:
>>>
>>>> I think this doesn't necessarily mean 3.0 is coming soon (thoughts on
>>>> timing? 6 months?) but simply next. Do you mean you'd prefer that change to
>>>> happen before 3.x? if it's a significant change, seems reasonable for a
>>>> major version bump rather than minor. Is the concern that tying it to 3.0
>>>> means you have to take a major version update to get it?
>>>>
>>>> I generally support moving on to 3.x so we can also jettison a lot of
>>>> older dependencies, code, fix some long standing issues, etc.
>>>>
>>>> (BTW Scala 2.12 support, mentioned in the OP, will go in for 2.4)
>>>>
>>>> On Thu, Sep 6, 2018 at 9:10 AM Ryan Blue 
>>>> wrote:
>>>>
>>>>> My concern is that the v2 data source API is still evolving and not
>>>>> very close to stable. I had hoped to have stabilized the API and behaviors
>>>>> for a 3.0 release. But we could also wait on that for a 4.0 release,
>>>>> depending on when we think that will be.
>>>>>
>>>>> Unless there is a pressing need to move to 3.0 for some other area, I
>>>>> think it would be better for the v2 sources to have a 2.5 release.
>>>>>
>>>>> On Thu, Sep 6, 2018 at 8:59 AM Xiao Li  wrote:
>>>>>
>>>>>> Yesterday, the 2.4 branch was created. Based on the above discussion,
>>>>>> I think we can bump the master branch to 3.0.0-SNAPSHOT. Any concern?
>>>>>>
>>>>>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>

-- 
Ryan Blue
Software Engineer
Netflix


Re: data source api v2 refactoring

2018-09-07 Thread Ryan Blue
There are a few v2-related changes that we can work in parallel, at least
for reviews:

* SPARK-25006, #21978 <https://github.com/apache/spark/pull/21978>: Add
catalog to TableIdentifier - this proposes how to incrementally add
multi-catalog support without breaking existing code paths
* SPARK-24253, #21308 <https://github.com/apache/spark/pull/21308>: Add
DeleteSupport API - this is a small API addition, which doesn't affect the
refactor
* SPARK-24252, #21306 <https://github.com/apache/spark/pull/21306>: Add v2
Catalog API - this is a different way to create v2 tables, also doesn't
affect the refactor

I agree that the PR for adding SQL support should probably wait on the
refactor. I have also been meaning to share our implementation, which isn't
based on the refactor. It handles CTAS, RTAS, InsertInto, DeleteFrom, and
AlterTable from both SQL and the other methods in the DF API, saveAsTable
and insertInto. It follows the structure that I proposed on the SQL support
PR to convert SQL plans to v2 plans and uses the new TableCatalog to
implement CTAS and RTAS.

rb


On Fri, Sep 7, 2018 at 12:27 AM Wenchen Fan  wrote:

> Hi Ryan,
>
> You are right that the `LogicalWrite` mirrors the read side API. I just
> don't have a good naming yet, and write side changes will be a different PR.
>
>
> Hi Hyukjin,
>
> That's my expectation, otherwise we keep rebasing the refactor PR and
> never get it done.
>
> On Fri, Sep 7, 2018 at 3:02 PM Hyukjin Kwon  wrote:
>
>> BTW, do we hold Datasource V2 related PRs for now until we finish this
>> refactoring just for clarification?
>>
>> 2018년 9월 7일 (금) 오전 12:52, Ryan Blue 님이 작성:
>>
>>> Wenchen,
>>>
>>> I'm not really sure what you're proposing here. What is a
>>> `LogicalWrite`? Is it something that mirrors the read side in your PR?
>>>
>>> I think that I agree that if we have a Write independent of the Table
>>> that carries the commit and abort methods, then we can create it directly
>>> without a WriteConfig. So I tentatively agree with what you propose,
>>> assuming that I understand it correctly.
>>>
>>> rb
>>>
>>> On Tue, Sep 4, 2018 at 8:42 PM Wenchen Fan  wrote:
>>>
>>>> I'm switching to my another Gmail account, let's see if it still gets
>>>> dropped this time.
>>>>
>>>> Hi Ryan,
>>>>
>>>> I'm thinking about the write path and feel the abstraction should be
>>>> the same.
>>>>
>>>> We still have logical and physical writing. And the table can create
>>>> different logical writing based on how to write. e.g., append, delete,
>>>> replaceWhere, etc.
>>>>
>>>> One thing I'm not sure about is the WriteConfig. With the WriteConfig,
>>>> the API would look like
>>>> trait Table {
>>>>   WriteConfig newAppendWriteConfig();
>>>>
>>>>   WriteConfig newDeleteWriteConfig(deleteExprs);
>>>>
>>>>   LogicalWrite newLogicalWrite(writeConfig);
>>>> }
>>>>
>>>> Without WriteConfig, the API looks like
>>>> trait Table {
>>>>   LogicalWrite newAppendWrite();
>>>>
>>>>   LogicalWrite newDeleteWrite(deleteExprs);
>>>> }
>>>>
>>>>
>>>> It looks to me that the API is simpler without WriteConfig, what do you
>>>> think?
>>>>
>>>> Thanks,
>>>> Wenchen
>>>>
>>>> On Wed, Sep 5, 2018 at 4:24 AM Ryan Blue 
>>>> wrote:
>>>>
>>>>> Latest from Wenchen in case it was dropped.
>>>>>
>>>>> -- Forwarded message -
>>>>> From: Wenchen Fan 
>>>>> Date: Mon, Sep 3, 2018 at 6:16 AM
>>>>> Subject: Re: data source api v2 refactoring
>>>>> To: 
>>>>> Cc: Ryan Blue , Reynold Xin ,
>>>>> 
>>>>>
>>>>>
>>>>> Hi Mridul,
>>>>>
>>>>> I'm not sure what's going on, my email was CC'ed to the dev list.
>>>>>
>>>>>
>>>>> Hi Ryan,
>>>>>
>>>>> The logical and physical scan idea sounds good. To add more color
>>>>> to Jungtaek's question, both micro-batch and continuous mode have
>>>>> the logical and physical scan, but there is a difference: for micro-batch
>>>>> mode, a physical scan outputs data for one epoch, but it's not true for
>

Re: Branch 2.4 is cut

2018-09-09 Thread Ryan Blue
Wenchen, can you hold off on the first RC?

The half-finished changes from the redesign of the DataSourceV2 API are in
master, added in SPARK-24882 <https://github.com/apache/spark/pull/22009>,
and are now in the 2.4 branch. We've had a lot of good discussion since
that PR was merged to update and fix the design, plus only one of the
follow-ups on SPARK-25186
<https://issues.apache.org/jira/browse/SPARK-25186> is done. Clearly, the
redesign was too large to get into 2.4 in so little time -- it was proposed
about 10 days before the original branch date -- and I don't think it is a
good idea to release half-finished major changes.

The easiest solution is to revert SPARK-24882 in the release branch. That
way we have minor changes in 2.4 and major changes in the next release,
instead of major changes in both. What does everyone think?

rb

On Fri, Sep 7, 2018 at 10:37 AM shane knapp  wrote:

> ++joshrosen  (thanks for the help w/deploying the jenkins configs)
>
> the basic 2.4 builds are deployed and building!
>
> i haven't created (a) build(s) yet for scala 2.12...  i'll be coordinating
> this w/the databricks folks next week.
>
> On Fri, Sep 7, 2018 at 9:53 AM, Dongjoon Hyun 
> wrote:
>
>> Thank you, Shane! :D
>>
>> Bests,
>> Dongjoon.
>>
>> On Fri, Sep 7, 2018 at 9:51 AM shane knapp  wrote:
>>
>>> i'll try and get to the 2.4 branch stuff today...
>>>
>>>
>
>
> --
> Shane Knapp
> UC Berkeley EECS Research / RISELab Staff Technical Lead
> https://rise.cs.berkeley.edu
>


-- 
Ryan Blue
Software Engineer
Netflix


Re: DataSourceWriter V2 Api questions

2018-09-10 Thread Ryan Blue
Ross, I think the intent is to create a single transaction on the driver,
write as part of it in each task, and then commit the transaction once the
tasks complete. Is that possible in your implementation?

I think that part of this is made more difficult by not having a clear
starting point for a write, which we are fixing in the redesign of the v2
API. That will have a method that creates a Write to track the operation.
That can create your transaction when it is created and commit the
transaction when commit is called on it.

rb

On Mon, Sep 10, 2018 at 9:05 AM Reynold Xin  wrote:

> Typically people do it via transactions, or staging tables.
>
>
> On Mon, Sep 10, 2018 at 2:07 AM Ross Lawley  wrote:
>
>> Hi all,
>>
>> I've been prototyping an implementation of the DataSource V2 writer for
>> the MongoDB Spark Connector and I have a couple of questions about how its
>> intended to be used with database systems. According to the Javadoc for
>> DataWriter.commit():
>>
>>
>> *"this method should still "hide" the written data and ask the
>> DataSourceWriter at driver side to do the final commit via
>> WriterCommitMessage"*
>>
>> Although, MongoDB now has transactions, it doesn't have a way to "hide"
>> the data once it has been written. So as soon as the DataWriter has
>> committed the data, it has been inserted/updated in the collection and is
>> discoverable - thereby breaking the documented contract.
>>
>> I was wondering how other databases systems plan to implement this API
>> and meet the contract as per the Javadoc?
>>
>> Many thanks
>>
>> Ross
>>
>

-- 
Ryan Blue
Software Engineer
Netflix


Re: DataSourceWriter V2 Api questions

2018-09-10 Thread Ryan Blue
rk should
>>>>> also integrate 2PC with its checkpointing mechanism to guarantee
>>>>> completeness of batch. And it might require different integration for
>>>>> continuous mode.
>>>>>
>>>>> Jungtaek Lim (HeartSaVioR)
>>>>>
>>>>> 2018년 9월 11일 (화) 오전 4:37, Arun Mahadevan 님이 작성:
>>>>>
>>>>>> In some cases the implementations may be ok with eventual consistency
>>>>>> (and does not care if the output is written out atomically)
>>>>>>
>>>>>> XA can be one option for datasources that supports it and requires
>>>>>> atomicity but I am not sure how would one implement it with the current
>>>>>> API.
>>>>>>
>>>>>> May be we need to discuss improvements at the Datasource V2 API level
>>>>>> (e.g. individual tasks would "prepare" for commit and once the driver
>>>>>> receives "prepared" from all the tasks, a "commit" would be invoked at 
>>>>>> each
>>>>>> of the individual tasks). Right now the responsibility of the final
>>>>>> "commit" is with the driver and it may not always be possible for the
>>>>>> driver to take over the transactions started by the tasks.
>>>>>>
>>>>>>
>>>>>> On Mon, 10 Sep 2018 at 11:48, Dilip Biswal 
>>>>>> wrote:
>>>>>>
>>>>>>> This is a pretty big challenge in general for data sources -- for
>>>>>>> the vast majority of data stores, the boundary of a transaction is per
>>>>>>> client. That is, you can't have two clients doing writes and 
>>>>>>> coordinating a
>>>>>>> single transaction. That's certainly the case for almost all relational
>>>>>>> databases. Spark, on the other hand, will have multiple clients 
>>>>>>> (consider
>>>>>>> each task a client) writing to the same underlying data store.
>>>>>>>
>>>>>>> DB>> Perhaps we can explore two-phase commit protocol (aka XA) for
>>>>>>> this ? Not sure how easy it is to implement this though :-)
>>>>>>>
>>>>>>> Regards,
>>>>>>> Dilip Biswal
>>>>>>> Tel: 408-463-4980 <(408)%20463-4980>
>>>>>>> dbis...@us.ibm.com
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> - Original message -
>>>>>>> From: Reynold Xin 
>>>>>>> To: Ryan Blue 
>>>>>>> Cc: ross.law...@gmail.com, dev 
>>>>>>> Subject: Re: DataSourceWriter V2 Api questions
>>>>>>> Date: Mon, Sep 10, 2018 10:26 AM
>>>>>>>
>>>>>>> I don't think the problem is just whether we have a starting point
>>>>>>> for write. As a matter of fact there's always a starting point for 
>>>>>>> write,
>>>>>>> whether it is explicit or implicit.
>>>>>>>
>>>>>>> This is a pretty big challenge in general for data sources -- for
>>>>>>> the vast majority of data stores, the boundary of a transaction is per
>>>>>>> client. That is, you can't have two clients doing writes and 
>>>>>>> coordinating a
>>>>>>> single transaction. That's certainly the case for almost all relational
>>>>>>> databases. Spark, on the other hand, will have multiple clients 
>>>>>>> (consider
>>>>>>> each task a client) writing to the same underlying data store.
>>>>>>>
>>>>>>> On Mon, Sep 10, 2018 at 10:19 AM Ryan Blue 
>>>>>>> wrote:
>>>>>>>
>>>>>>> Ross, I think the intent is to create a single transaction on the
>>>>>>> driver, write as part of it in each task, and then commit the 
>>>>>>> transaction
>>>>>>> once the tasks complete. Is that possible in your implementation?
>>>>>>>
>>>>>>> I think that part of this is made more difficult by not having a
>>>>>>> clear starting point for a write, which we are fixing in the redesign of
>>>>>>> the v2 API. That will have a method that creates a Write to track the
>>>>>>> operation. That can create your transaction when it is created and 
>>>>>>> commit
>>>>>>> the transaction when commit is called on it.
>>>>>>>
>>>>>>> rb
>>>>>>>
>>>>>>> On Mon, Sep 10, 2018 at 9:05 AM Reynold Xin 
>>>>>>> wrote:
>>>>>>>
>>>>>>> Typically people do it via transactions, or staging tables.
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Sep 10, 2018 at 2:07 AM Ross Lawley 
>>>>>>> wrote:
>>>>>>>
>>>>>>> Hi all,
>>>>>>>
>>>>>>> I've been prototyping an implementation of the DataSource V2 writer
>>>>>>> for the MongoDB Spark Connector and I have a couple of questions about 
>>>>>>> how
>>>>>>> its intended to be used with database systems. According to the Javadoc 
>>>>>>> for
>>>>>>> DataWriter.commit():
>>>>>>>
>>>>>>> *"this method should still "hide" the written data and ask the
>>>>>>> DataSourceWriter at driver side to do the final commit via
>>>>>>> WriterCommitMessage"*
>>>>>>>
>>>>>>> Although, MongoDB now has transactions, it doesn't have a way to
>>>>>>> "hide" the data once it has been written. So as soon as the DataWriter 
>>>>>>> has
>>>>>>> committed the data, it has been inserted/updated in the collection and 
>>>>>>> is
>>>>>>> discoverable - thereby breaking the documented contract.
>>>>>>>
>>>>>>> I was wondering how other databases systems plan to implement this
>>>>>>> API and meet the contract as per the Javadoc?
>>>>>>>
>>>>>>> Many thanks
>>>>>>>
>>>>>>> Ross
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Ryan Blue
>>>>>>> Software Engineer
>>>>>>> Netflix
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> -
>>>>>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>>>>>
>>>>>>

-- 
Ryan Blue
Software Engineer
Netflix


Re: DataSourceWriter V2 Api questions

2018-09-10 Thread Ryan Blue
nator can't take over transactions from writers to do the
>>>>> final commit.
>>>>>
>>>>> XA is also not a trivial one to get it correctly with current
>>>>> execution model: Spark doesn't require writer tasks to run at the same 
>>>>> time
>>>>> but to achieve 2PC they should run until end of transaction (closing 
>>>>> client
>>>>> before transaction ends normally means aborting transaction). Spark should
>>>>> also integrate 2PC with its checkpointing mechanism to guarantee
>>>>> completeness of batch. And it might require different integration for
>>>>> continuous mode.
>>>>>
>>>>> Jungtaek Lim (HeartSaVioR)
>>>>>
>>>>> 2018년 9월 11일 (화) 오전 4:37, Arun Mahadevan 님이 작성:
>>>>>
>>>>>> In some cases the implementations may be ok with eventual consistency
>>>>>> (and does not care if the output is written out atomically)
>>>>>>
>>>>>> XA can be one option for datasources that supports it and requires
>>>>>> atomicity but I am not sure how would one implement it with the current
>>>>>> API.
>>>>>>
>>>>>> May be we need to discuss improvements at the Datasource V2 API level
>>>>>> (e.g. individual tasks would "prepare" for commit and once the driver
>>>>>> receives "prepared" from all the tasks, a "commit" would be invoked at 
>>>>>> each
>>>>>> of the individual tasks). Right now the responsibility of the final
>>>>>> "commit" is with the driver and it may not always be possible for the
>>>>>> driver to take over the transactions started by the tasks.
>>>>>>
>>>>>>
>>>>>> On Mon, 10 Sep 2018 at 11:48, Dilip Biswal 
>>>>>> wrote:
>>>>>>
>>>>>>> This is a pretty big challenge in general for data sources -- for
>>>>>>> the vast majority of data stores, the boundary of a transaction is per
>>>>>>> client. That is, you can't have two clients doing writes and 
>>>>>>> coordinating a
>>>>>>> single transaction. That's certainly the case for almost all relational
>>>>>>> databases. Spark, on the other hand, will have multiple clients 
>>>>>>> (consider
>>>>>>> each task a client) writing to the same underlying data store.
>>>>>>>
>>>>>>> DB>> Perhaps we can explore two-phase commit protocol (aka XA) for
>>>>>>> this ? Not sure how easy it is to implement this though :-)
>>>>>>>
>>>>>>> Regards,
>>>>>>> Dilip Biswal
>>>>>>> Tel: 408-463-4980 <(408)%20463-4980>
>>>>>>> dbis...@us.ibm.com
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> - Original message -
>>>>>>> From: Reynold Xin 
>>>>>>> To: Ryan Blue 
>>>>>>> Cc: ross.law...@gmail.com, dev 
>>>>>>> Subject: Re: DataSourceWriter V2 Api questions
>>>>>>> Date: Mon, Sep 10, 2018 10:26 AM
>>>>>>>
>>>>>>> I don't think the problem is just whether we have a starting point
>>>>>>> for write. As a matter of fact there's always a starting point for 
>>>>>>> write,
>>>>>>> whether it is explicit or implicit.
>>>>>>>
>>>>>>> This is a pretty big challenge in general for data sources -- for
>>>>>>> the vast majority of data stores, the boundary of a transaction is per
>>>>>>> client. That is, you can't have two clients doing writes and 
>>>>>>> coordinating a
>>>>>>> single transaction. That's certainly the case for almost all relational
>>>>>>> databases. Spark, on the other hand, will have multiple clients 
>>>>>>> (consider
>>>>>>> each task a client) writing to the same underlying data store.
>>>>>>>
>>>>>>> On Mon, Sep 10, 2018 at 10:19 AM Ryan Blue 
>>>>>>> wrote:
>>>>>>>
>>>>>>> Ross, I think the intent is to create a single transaction on the
>>>>>>> driver, write as part of it in each task, and then commit the 
>>>>>>> transaction
>>>>>>> once the tasks complete. Is that possible in your implementation?
>>>>>>>
>>>>>>> I think that part of this is made more difficult by not having a
>>>>>>> clear starting point for a write, which we are fixing in the redesign of
>>>>>>> the v2 API. That will have a method that creates a Write to track the
>>>>>>> operation. That can create your transaction when it is created and 
>>>>>>> commit
>>>>>>> the transaction when commit is called on it.
>>>>>>>
>>>>>>> rb
>>>>>>>
>>>>>>> On Mon, Sep 10, 2018 at 9:05 AM Reynold Xin 
>>>>>>> wrote:
>>>>>>>
>>>>>>> Typically people do it via transactions, or staging tables.
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Sep 10, 2018 at 2:07 AM Ross Lawley 
>>>>>>> wrote:
>>>>>>>
>>>>>>> Hi all,
>>>>>>>
>>>>>>> I've been prototyping an implementation of the DataSource V2 writer
>>>>>>> for the MongoDB Spark Connector and I have a couple of questions about 
>>>>>>> how
>>>>>>> its intended to be used with database systems. According to the Javadoc 
>>>>>>> for
>>>>>>> DataWriter.commit():
>>>>>>>
>>>>>>> *"this method should still "hide" the written data and ask the
>>>>>>> DataSourceWriter at driver side to do the final commit via
>>>>>>> WriterCommitMessage"*
>>>>>>>
>>>>>>> Although, MongoDB now has transactions, it doesn't have a way to
>>>>>>> "hide" the data once it has been written. So as soon as the DataWriter 
>>>>>>> has
>>>>>>> committed the data, it has been inserted/updated in the collection and 
>>>>>>> is
>>>>>>> discoverable - thereby breaking the documented contract.
>>>>>>>
>>>>>>> I was wondering how other databases systems plan to implement this
>>>>>>> API and meet the contract as per the Javadoc?
>>>>>>>
>>>>>>> Many thanks
>>>>>>>
>>>>>>> Ross
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Ryan Blue
>>>>>>> Software Engineer
>>>>>>> Netflix
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> -
>>>>>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>>>>>
>>>>>>

-- 
Ryan Blue
Software Engineer
Netflix


Re: [Discuss] Datasource v2 support for Kerberos

2018-09-19 Thread Ryan Blue
I’m not a huge fan of special cases for configuration values like this. Is
there something that we can do to pass a set of values to all sources (and
catalogs for #21306)?

I would prefer adding a special prefix for options that are passed to all
sources, like this:

spark.sql.catalog.shared.shared-property = value0
spark.sql.catalog.jdbc-prod.prop = value1
spark.datasource.source-name.prop = value2

All of the properties in the shared namespace would be passed to all
catalogs and sources. What do you think?

On Sun, Sep 16, 2018 at 6:51 PM Wenchen Fan  wrote:

> I'm +1 for this proposal: "Extend SessionConfigSupport to support passing
> specific white-listed configuration values"
>
> One goal of data source v2 API is to not depend on any high-level APIs
> like SparkSession, SQLConf, etc. If users do want to access these
> high-level APIs, there is a workaround: calling `SparkSession.getActive` or
> `SQLConf.get`.
>
> In the meanwhile, I think you use case makes sense. `SessionConfigSupport`
> is created for this use case but it's not powerful enough yet. I think it
> should support multiple key-prefixes and white-list.
>
> Feel free to submit a patch, and thanks for looking into it!
>
> On Sun, Sep 16, 2018 at 2:40 PM tigerquoll  wrote:
>
>> The current V2 Datasource API provides support for querying a portion of
>> the
>> SparkConfig namespace (spark.datasource.*) via the SessionConfigSupport
>> API.
>> This was designed with the assumption that all configuration information
>> for
>> v2 data sources should be separate from each other.
>>
>> Unfortunately, there are some cross-cutting concerns such as
>> authentication
>> that touch multiple data sources - this means that common configuration
>> items need to be shared amongst multiple data sources.
>> In particular, Kerberos setup can use the following configuration items:
>>
>> * userPrincipal,
>> * userKeytabPath
>> * krb5ConfPath
>> * kerberos debugging flags
>> * spark.security.credentials.${service}.enabled
>> * JAAS config
>> * ZKServerPrincipal ??
>>
>> So potential solutions I can think of to pass this information to various
>> data sources are:
>>
>> * Pass the entire SparkContext object to data sources (not likely)
>> * Pass the entire SparkConfig Map object to data sources
>> * Pass all required configuration via environment variables
>> * Extend SessionConfigSupport to support passing specific white-listed
>> configuration values
>> * Add a specific data source v2 API "SupportsKerberos" so that a data
>> source
>> can indicate that it supports Kerberos and also provide the means to pass
>> needed configuration info.
>> * Expand out all Kerberos configuration items to be in each data source
>> config namespace that needs it.
>>
>> If the data source requires TLS support then we also need to support
>> passing
>> all the  configuration values under  "spark.ssl.*"
>>
>> What do people think?  Placeholder Issue has been added at SPARK-25329.
>>
>>
>>
>> --
>> Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
>>
>> -
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>
>>

-- 
Ryan Blue
Software Engineer
Netflix


Re: [Discuss] Datasource v2 support for manipulating partitions

2018-09-19 Thread Ryan Blue
I'm open to exploring the idea of adding partition management as a catalog
API. The approach we're taking is to have an interface for each concern a
catalog might implement, like TableCatalog (proposed in SPARK-24252), but
also FunctionCatalog for stored functions and possibly
PartitionedTableCatalog for explicitly partitioned tables.

That could definitely be used to implement ALTER TABLE ADD/DROP PARTITION
for Hive tables, although I'm not sure that we would want to continue
exposing partitions for simple tables. I know that this is important for
storage systems like Kudu, but I think it is needlessly difficult and
annoying for simple tables that are partitioned by a regular transformation
like Hive tables. That's why Iceberg hides partitioning outside of table
configuration. That also avoids problems where SELECT DISTINCT queries are
wrong because a partition exists but has no data.

How useful is this outside of Kudu? Is it something that we should provide
an API for, or is it specific enough to Kudu that Spark shouldn't include
it in the API for all sources?

rb


On Tue, Sep 18, 2018 at 7:38 AM Thakrar, Jayesh <
jthak...@conversantmedia.com> wrote:

> Totally agree with you Dale, that there are situations for efficiency,
> performance and better control/visibility/manageability that we need to
> expose partition management.
>
> So as described, I suggested two things - the ability to do it in the
> current V2 API form via options and appropriate implementation in
> datasource reader/writer.
>
> And for long term, suggested that partition management can be made part of
> metadata/catalog management - SPARK-24252 (DataSourceV2: Add catalog
> support)?
>
>
> On 9/17/18, 8:26 PM, "tigerquoll"  wrote:
>
> Hi Jayesh,
> I get where you are coming from - partitions are just an implementation
> optimisation that we really shouldn’t be bothering the end user with.
> Unfortunately that view is like saying RPC is like a procedure call,
> and
> details of the network transport should be hidden from the end user.
> CORBA
> tried this approach for RPC and failed for the same reason that no
> major
> vendor of DBMS systems that support partitions try to hide them from
> the end
> user.  They have a substantial real world effect that is impossible to
> hide
> from the user (in particular when writing/modifying the data source).
> Any
> attempt to “take care” of partitions automatically invariably guesses
> wrong
> and ends up frustrating the end user (as “substantial real world
> effect”
> turns to “show stopping performance penalty” if the user attempts to
> fight
> against a partitioning scheme she has no idea exists)
>
> So if we are not hiding them from the user, we need to allow users to
> manipulate them. Either by representing them generically in the API,
> allowing pass-through commands to manipulate them, or by some other
> means.
>
> Regards,
> Dale.
>
>
>
>
> --
> Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
>
>
>
>

-- 
Ryan Blue
Software Engineer
Netflix


Re: Datasource v2 Select Into support

2018-09-19 Thread Ryan Blue
Ross,

The problem you're hitting is that there aren't many logical plans that
work with the v2 source API yet. Here, you're creating an InsertIntoTable
logical plan from SQL, which can't be converted to a physical plan because
there is no rule to convert it either to the right logical plan for v2,
AppendData.

What we need to do next is to get the new logical plans for v2 into Spark,
like CreateTableAsSelect and ReplacePartitionsDynamic, and then add
analysis rules to convert from the plans created by the SQL parser to those
v2 plans. The reason why we're adding new logical plans is to clearly
define the behavior of these queries, including the rules that validate
data can be written and how the operation is implemented, like creating a
table before writing to it for CTAS.

I currently have a working implementation of this, but we're blocked
getting it into upstream Spark on the issue to add the TableCatalog API.
I'd love to get that in so we can get more of our implementation submitted.

rb

On Thu, Sep 6, 2018 at 6:54 AM Wenchen Fan  wrote:

> Data source v2 catalog support(table/view) is still in progress. There are
> several threads in the dev list discussing it, please join the discussion
> if you are interested. Thanks for trying!
>
> On Thu, Sep 6, 2018 at 7:23 PM Ross Lawley  wrote:
>
>> Hi,
>>
>> I hope this is the correct mailinglist. I've been adding v2 support to
>> the MongoDB Spark connector using Spark 2.3.1.  I've noticed one of my
>> tests pass when using the original DefaultSource but errors with my v2
>> implementation:
>>
>> The code I'm running is:
>> val df = spark.loadDS[Character]()
>> df.createOrReplaceTempView("people")
>> spark.sql("INSERT INTO table people SELECT 'Mort', 1000")
>>
>> The error I see is:
>> unresolved operator 'InsertIntoTable DataSourceV2Relation [name#0,
>> age#1], MongoDataSourceReader ...
>> 'InsertIntoTable DataSourceV2Relation [name#0, age#1],
>> MongoDataSourceReader 
>> +- Project [Mort AS Mort#7, 1000 AS 1000#8]
>>+- OneRowRelation
>>
>> My DefaultSource V2 implementation extends DataSourceV2 with ReadSupport
>> with ReadSupportWithSchema with WriteSupport
>>
>> I'm wondering if there is something I'm not implementing, or if there is
>> a bug in my implementation or its an issue with Spark?
>>
>> Any pointers would be great,
>>
>> Ross
>>
>

-- 
Ryan Blue
Software Engineer
Netflix


Re: data source api v2 refactoring

2018-09-19 Thread Ryan Blue
Hi Jayesh,

The existing sources haven't been ported to v2 yet. That is going to be
tricky because the existing sources implement behaviors that we need to
keep for now.

I wrote up an SPIP to standardize logical plans while moving to the v2
sources. The reason why we need this is that too much is delegated to
sources today. For example, sources are handed a SaveMode to overwrite
data, but what exactly gets overwritten isn't defined and it varies by the
source that gets used. That's not a good thing and we want to clean up what
happens so that users know that a query behaves the same way across all v2
sources. CTAS shouldn't succeed for one source but fail for another if the
table already exists.

Standardizing plans makes it difficult to port the existing sources to v2
because we need to implement the behavior of the v2 plans, which may not be
the existing v1 behavior. I think what we should do is keep the existing v1
sources working as they do today, and add a way to opt in for v2 behavior.
One good way to do this is to use a new write API that is more clear; I
proposed one in the SPIP I mentioned earlier. SQL is a bit easier because
the behavior for SQL is fairly well-defined. The problem is mostly with the
existing DF write API, DataFrameWriter.

It would be great to open a discussion about the compatibility between v1
and v2 and come up with a plan on this list.

rb

On Fri, Sep 7, 2018 at 2:12 PM Thakrar, Jayesh 
wrote:

> Ryan et al,
>
>
>
> Wondering if the existing Spark based data sources (e.g. for HDFS, Kafka)
> have been ported to V2.
>
> I remember reading threads where there were discussions about the
> inefficiency/overhead of converting from Row to InternalRow that was
> preventing certain porting effort etc.
>
>
>
> I ask because those are the most widely used data sources and have a lot
> of effort and thinking behind them, and if they have ported over to V2,
> then they can serve as excellent production examples of V2 API.
>
>
>
> Thanks,
>
> Jayesh
>
>
>
> *From: *Ryan Blue 
> *Reply-To: *
> *Date: *Friday, September 7, 2018 at 2:19 PM
> *To: *Wenchen Fan 
> *Cc: *Hyukjin Kwon , Spark Dev List <
> dev@spark.apache.org>
> *Subject: *Re: data source api v2 refactoring
>
>
>
> There are a few v2-related changes that we can work in parallel, at least
> for reviews:
>
>
>
> * SPARK-25006, #21978 <https://github.com/apache/spark/pull/21978>: Add
> catalog to TableIdentifier - this proposes how to incrementally add
> multi-catalog support without breaking existing code paths
>
> * SPARK-24253, #21308 <https://github.com/apache/spark/pull/21308>: Add
> DeleteSupport API - this is a small API addition, which doesn't affect the
> refactor
>
> * SPARK-24252, #21306 <https://github.com/apache/spark/pull/21306>: Add
> v2 Catalog API - this is a different way to create v2 tables, also doesn't
> affect the refactor
>
>
>
> I agree that the PR for adding SQL support should probably wait on the
> refactor. I have also been meaning to share our implementation, which isn't
> based on the refactor. It handles CTAS, RTAS, InsertInto, DeleteFrom, and
> AlterTable from both SQL and the other methods in the DF API, saveAsTable
> and insertInto. It follows the structure that I proposed on the SQL support
> PR to convert SQL plans to v2 plans and uses the new TableCatalog to
> implement CTAS and RTAS.
>
>
>
> rb
>
>
>
>
>
> On Fri, Sep 7, 2018 at 12:27 AM Wenchen Fan  wrote:
>
> Hi Ryan,
>
>
>
> You are right that the `LogicalWrite` mirrors the read side API. I just
> don't have a good naming yet, and write side changes will be a different PR.
>
>
>
>
>
> Hi Hyukjin,
>
>
>
> That's my expectation, otherwise we keep rebasing the refactor PR and
> never get it done.
>
>
>
> On Fri, Sep 7, 2018 at 3:02 PM Hyukjin Kwon  wrote:
>
> BTW, do we hold Datasource V2 related PRs for now until we finish this
> refactoring just for clarification?
>
>
>
> 2018년 9월 7일 (금) 오전 12:52, Ryan Blue 님이 작성:
>
> Wenchen,
>
>
>
> I'm not really sure what you're proposing here. What is a `LogicalWrite`?
> Is it something that mirrors the read side in your PR?
>
>
>
> I think that I agree that if we have a Write independent of the Table that
> carries the commit and abort methods, then we can create it directly
> without a WriteConfig. So I tentatively agree with what you propose,
> assuming that I understand it correctly.
>
>
>
> rb
>
>
>
> On Tue, Sep 4, 2018 at 8:42 PM Wenchen Fan  wrote:
>
> I'm switching to my another Gmail account, let's see if it still get

Re: [Discuss] Datasource v2 support for manipulating partitions

2018-09-19 Thread Ryan Blue
What does partition management look like in those systems and what are the
options we would standardize in an API?

On Wed, Sep 19, 2018 at 2:16 PM Thakrar, Jayesh <
jthak...@conversantmedia.com> wrote:

> I think partition management feature would be very useful in RDBMSes that
> support it – e.g. Oracle, PostgreSQL, and DB2.
>
> In some cases add partitions can be explicit and can/may be done outside
> of data loads.
>
> But in some other cases, it may/can need to be done implicitly when
> supported  by the platform.
>
> Similar to the static/dynamic partition loading in Hive and Oracle.
>
>
>
> So in short, I agree that partition management should be an optional
> interface.
>
>
>
> *From: *Ryan Blue 
> *Reply-To: *"rb...@netflix.com" 
> *Date: *Wednesday, September 19, 2018 at 2:58 PM
> *To: *"Thakrar, Jayesh" 
> *Cc: *"tigerqu...@outlook.com" , Spark Dev List <
> dev@spark.apache.org>
> *Subject: *Re: [Discuss] Datasource v2 support for manipulating partitions
>
>
>
> I'm open to exploring the idea of adding partition management as a catalog
> API. The approach we're taking is to have an interface for each concern a
> catalog might implement, like TableCatalog (proposed in SPARK-24252), but
> also FunctionCatalog for stored functions and possibly
> PartitionedTableCatalog for explicitly partitioned tables.
>
>
>
> That could definitely be used to implement ALTER TABLE ADD/DROP PARTITION
> for Hive tables, although I'm not sure that we would want to continue
> exposing partitions for simple tables. I know that this is important for
> storage systems like Kudu, but I think it is needlessly difficult and
> annoying for simple tables that are partitioned by a regular transformation
> like Hive tables. That's why Iceberg hides partitioning outside of table
> configuration. That also avoids problems where SELECT DISTINCT queries are
> wrong because a partition exists but has no data.
>
>
>
> How useful is this outside of Kudu? Is it something that we should provide
> an API for, or is it specific enough to Kudu that Spark shouldn't include
> it in the API for all sources?
>
>
>
> rb
>
>
>
>
>
> On Tue, Sep 18, 2018 at 7:38 AM Thakrar, Jayesh <
> jthak...@conversantmedia.com> wrote:
>
> Totally agree with you Dale, that there are situations for efficiency,
> performance and better control/visibility/manageability that we need to
> expose partition management.
>
> So as described, I suggested two things - the ability to do it in the
> current V2 API form via options and appropriate implementation in
> datasource reader/writer.
>
> And for long term, suggested that partition management can be made part of
> metadata/catalog management - SPARK-24252 (DataSourceV2: Add catalog
> support)?
>
>
> On 9/17/18, 8:26 PM, "tigerquoll"  wrote:
>
> Hi Jayesh,
> I get where you are coming from - partitions are just an implementation
> optimisation that we really shouldn’t be bothering the end user with.
> Unfortunately that view is like saying RPC is like a procedure call,
> and
> details of the network transport should be hidden from the end user.
> CORBA
> tried this approach for RPC and failed for the same reason that no
> major
> vendor of DBMS systems that support partitions try to hide them from
> the end
> user.  They have a substantial real world effect that is impossible to
> hide
> from the user (in particular when writing/modifying the data source).
> Any
> attempt to “take care” of partitions automatically invariably guesses
> wrong
> and ends up frustrating the end user (as “substantial real world
> effect”
> turns to “show stopping performance penalty” if the user attempts to
> fight
> against a partitioning scheme she has no idea exists)
>
> So if we are not hiding them from the user, we need to allow users to
> manipulate them. Either by representing them generically in the API,
> allowing pass-through commands to manipulate them, or by some other
> means.
>
> Regards,
> Dale.
>
>
>
>
> --
> Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
>
>
>
>
>
> --
>
> Ryan Blue
>
> Software Engineer
>
> Netflix
>


-- 
Ryan Blue
Software Engineer
Netflix


Re: [VOTE] SPARK 2.3.2 (RC6)

2018-09-20 Thread Ryan Blue
; https://repository.apache.org/content/repositories/orgapachespark-1286/
>>>>>>>> >
>>>>>>>> > The documentation corresponding to this release can be found at:
>>>>>>>> > https://dist.apache.org/repos/dist/dev/spark/v2.3.2-rc6-docs/
>>>>>>>> >
>>>>>>>> > The list of bug fixes going into 2.3.2 can be found at the
>>>>>>>> following URL:
>>>>>>>> > https://issues.apache.org/jira/projects/SPARK/versions/12343289
>>>>>>>> >
>>>>>>>> >
>>>>>>>> > FAQ
>>>>>>>> >
>>>>>>>> > =
>>>>>>>> > How can I help test this release?
>>>>>>>> > =
>>>>>>>> >
>>>>>>>> > If you are a Spark user, you can help us test this release by
>>>>>>>> taking
>>>>>>>> > an existing Spark workload and running on this release candidate,
>>>>>>>> then
>>>>>>>> > reporting any regressions.
>>>>>>>> >
>>>>>>>> > If you're working in PySpark you can set up a virtual env and
>>>>>>>> install
>>>>>>>> > the current RC and see if anything important breaks, in the
>>>>>>>> Java/Scala
>>>>>>>> > you can add the staging repository to your projects resolvers and
>>>>>>>> test
>>>>>>>> > with the RC (make sure to clean up the artifact cache
>>>>>>>> before/after so
>>>>>>>> > you don't end up building with a out of date RC going forward).
>>>>>>>> >
>>>>>>>> > ===
>>>>>>>> > What should happen to JIRA tickets still targeting 2.3.2?
>>>>>>>> > ===
>>>>>>>> >
>>>>>>>> > The current list of open tickets targeted at 2.3.2 can be found
>>>>>>>> at:
>>>>>>>> > https://issues.apache.org/jira/projects/SPARK and search for
>>>>>>>> "Target Version/s" = 2.3.2
>>>>>>>> >
>>>>>>>> > Committers should look at those and triage. Extremely important
>>>>>>>> bug
>>>>>>>> > fixes, documentation, and API tweaks that impact compatibility
>>>>>>>> should
>>>>>>>> > be worked on immediately. Everything else please retarget to an
>>>>>>>> > appropriate release.
>>>>>>>> >
>>>>>>>> > ==
>>>>>>>> > But my bug isn't fixed?
>>>>>>>> > ==
>>>>>>>> >
>>>>>>>> > In order to make timely releases, we will typically not hold the
>>>>>>>> > release unless the bug in question is a regression from the
>>>>>>>> previous
>>>>>>>> > release. That being said, if there is something which is a
>>>>>>>> regression
>>>>>>>> > that has not been correctly targeted please ping me or a
>>>>>>>> committer to
>>>>>>>> > help target the issue.
>>>>>>>>
>>>>>>>>
>>>>>>>> -
>>>>>>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>>>>>>>
>>>>>>>>
>>>>
>>>> --
>>>> ---
>>>> Takeshi Yamamuro
>>>>
>>>
>>>
>>> --
>>> John
>>>
>>

-- 
Ryan Blue
Software Engineer
Netflix


Re: [VOTE] SPARK 2.3.2 (RC6)

2018-09-20 Thread Ryan Blue
Changing my vote to +1 with this fixed.

Here's what was going on -- and thanks to Owen O'Malley for debugging:

The problem was that Iceberg contained a fix for a JVM bug for timestamps
before the unix epoch where the timestamp was off by 1s. Owen moved this
code into ORC as well and using the new version of Spark pulled in the
newer version of ORC. That meant that the values were "fixed" twice and
were wrong.

Updating the Iceberg code to rely on the fix in the version of ORC that
Spark includes fixes the problem.

On Thu, Sep 20, 2018 at 2:38 PM Dongjoon Hyun 
wrote:

> Hi, Ryan.
>
> Could you share the result on 2.3.1 since this is 2.3.2 RC? That would be
> helpful to narrow down the scope.
>
> Bests,
> Dongjoon.
>
> On Thu, Sep 20, 2018 at 11:56 Ryan Blue  wrote:
>
>> -0
>>
>> My DataSourceV2 implementation for Iceberg is failing ORC tests when I
>> run with the 2.3.2 RC that pass when I run with 2.3.0. I'm tracking down
>> the cause and will report back, but I'm -0 on the release because there may
>> be a behavior change.
>>
>> On Thu, Sep 20, 2018 at 10:37 AM Denny Lee  wrote:
>>
>>> +1
>>>
>>> On Thu, Sep 20, 2018 at 9:55 AM Xiao Li  wrote:
>>>
>>>> +1
>>>>
>>>>
>>>> John Zhuge  于2018年9月19日周三 下午1:17写道:
>>>>
>>>>> +1 (non-binding)
>>>>>
>>>>> Built on Ubuntu 16.04 with Maven flags: -Phadoop-2.7 -Pmesos -Pyarn
>>>>> -Phive-thriftserver -Psparkr -Pkinesis-asl -Phadoop-provided
>>>>>
>>>>> java version "1.8.0_181"
>>>>> Java(TM) SE Runtime Environment (build 1.8.0_181-b13)
>>>>> Java HotSpot(TM) 64-Bit Server VM (build 25.181-b13, mixed mode)
>>>>>
>>>>>
>>>>> On Wed, Sep 19, 2018 at 2:31 AM Takeshi Yamamuro <
>>>>> linguin@gmail.com> wrote:
>>>>>
>>>>>> +1
>>>>>>
>>>>>> I also checked `-Pyarn -Phadoop-2.7 -Pkinesis-asl -Phive
>>>>>> -Phive-thriftserve` on the openjdk below/macOSv10.12.6
>>>>>>
>>>>>> $ java -version
>>>>>> java version "1.8.0_181"
>>>>>> Java(TM) SE Runtime Environment (build 1.8.0_181-b13)
>>>>>> Java HotSpot(TM) 64-Bit Server VM (build 25.181-b13, mixed mode)
>>>>>>
>>>>>>
>>>>>> On Wed, Sep 19, 2018 at 10:45 AM Dongjoon Hyun <
>>>>>> dongjoon.h...@gmail.com> wrote:
>>>>>>
>>>>>>> +1.
>>>>>>>
>>>>>>> I tested with `-Pyarn -Phadoop-2.7 -Pkinesis-asl -Phive
>>>>>>> -Phive-thriftserve` on OpenJDK(1.8.0_181)/CentOS 7.5.
>>>>>>>
>>>>>>> I hit the following test case failure once during testing, but it's
>>>>>>> not persistent.
>>>>>>>
>>>>>>> KafkaContinuousSourceSuite
>>>>>>> ...
>>>>>>> subscribing topic by name from earliest offsets (failOnDataLoss:
>>>>>>> false) *** FAILED ***
>>>>>>>
>>>>>>> Thank you, Saisai.
>>>>>>>
>>>>>>> Bests,
>>>>>>> Dongjoon.
>>>>>>>
>>>>>>> On Mon, Sep 17, 2018 at 6:48 PM Saisai Shao 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> +1 from my own side.
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>> Saisai
>>>>>>>>
>>>>>>>> Wenchen Fan  于2018年9月18日周二 上午9:34写道:
>>>>>>>>
>>>>>>>>> +1. All the blocker issues are all resolved in 2.3.2 AFAIK.
>>>>>>>>>
>>>>>>>>> On Tue, Sep 18, 2018 at 9:23 AM Sean Owen 
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> +1 . Licenses and sigs check out as in previous 2.3.x releases. A
>>>>>>>>>> build from source with most profiles passed for me.
>>>>>>>>>> On Mon, Sep 17, 2018 at 8:17 AM Saisai Shao <
>>>>>>>>>> sai.sai.s...@gmail.com> wrote:
>>>>>>>>>> >
>>>>>>>>>> > Please vote on releasing the following candidat

Re: [Discuss] Datasource v2 support for Kerberos

2018-09-24 Thread Ryan Blue
Dale, what do you think about the option that I suggested? I think that's
different from the ones that you just listed.

Basically, the idea is to have a "shared" set of options that are passed to
all sources. This would not be a whitelist, it would be a namespace that
ends up passed in everywhere. That way, kerberos options would be set in
the shared space, but could be set directly if you want to override.

The problem I have with your option 1 is that it requires a whiltelist,
which is difficult to maintain and doesn't have obvious behavior. If a user
wants to share an option, it has to be a special one. Otherwise the user
has to wait until we add it to a whitelist, which is slow.

I don't think your option 2 works because that's no better than what we do
today. And as you said, isolating config is a good goal.

Your option 3 is basically a whitelist, but with additional interfaces to
activate the option sets to forward. I think that's a bit too intrusive and
shares the problems that a whitelist has.

The option I'm proposing gets around those issues because it is obvious
what is happening. Any option under the shared namespace is copied to all
sources and catalogs. That doesn't require Spark to do anything to support
specific sets of options and is predictable behavior for users to
understand. It also allows us to maintain separation instead of passing all
options. I think this is a good option overall.

What do you think?

rb

On Sun, Sep 23, 2018 at 5:21 PM tigerquoll  wrote:

> I believe the current spark config system is unfortunate in the way it has
> grown - you have no way of telling which sub-systems uses which
> configuration options without direct and detailed reading of the code.
>
> Isolating config items for datasources into a separate namespaces (rather
> then using a whitelist), is a nice idea - unfortunately in this case we are
> dealing with configuration items that have been exposed to end-users in
> their current from for a significant amount of time, and Kerberos
> cross-cuts
> not only datasources, but also things like YARN.
>
> So given that fact - the best options of a way forward I can think of are:
> 1. Whitelisting of specific sub sections of the configuration space, or
> 2. Just pass in a Map[String,String] of all config values
> 3. Implement a specific interface for data sources to indicate/implement
> Kerberos support
>
> Option (1), is pretty arbitrary, and more then likely the whitelist will
> change from version to version as additional items get added to it.  Data
> sources will develop dependencies on certain configuration values being
> present in the white list.
>
> Option (2) would work, but continues the practice of having a vaguely
> specified grab-bag of config items as a dependency for practically all
> Spark
> code.
>
> I am beginning to to warm to option (3), it would be a clean way of
> declaring that a data source supports Kerberos, and also a cleanly
> specified
> way of injecting the relevant Kerberos configuration information into the
> data source - and we will not need to change any user-facing configuration
> items as well.
>
>
>
>
>
> --
> Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>

-- 
Ryan Blue
Software Engineer
Netflix


Re: [Discuss] Datasource v2 support for Kerberos

2018-09-25 Thread Ryan Blue
I agree with Wenchen that we'd remove the prefix when passing to a source,
so you could use the same "spark.yarn.keytab" option in both places. But I
think the problem is that "spark.yarn.keytab" still needs to be set, and it
clearly isn't in a shared namespace for catalog options. So I think we
would still need a solution for existing options. I'm more comfortable with
a white list for existing options that we want to maintain compatibility
with.

rb



On Mon, Sep 24, 2018 at 11:52 PM tigerquoll  wrote:

> To give some Kerberos specific examples, The spark-submit args:
> -–conf spark.yarn.keytab=path_to_keytab -–conf
> spark.yarn.principal=princi...@realm.com
>
> are currently not passed through to the data sources.
>
>
>
>
>
> --
> Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>

-- 
Ryan Blue
Software Engineer
Netflix


[DISCUSS] Syntax for table DDL

2018-09-28 Thread Ryan Blue
Hi everyone,

I’m currently working on new table DDL statements for v2 tables. For
context, the new logical plans for DataSourceV2 require a catalog interface
so that Spark can create tables for operations like CTAS. The proposed
TableCatalog API also includes an API for altering those tables so we can
make ALTER TABLE statements work. I’m implementing those DDL statements,
which will make it into upstream Spark when the TableCatalog PR is merged.

Since I’m adding new SQL statements that don’t yet exist in Spark, I want
to make sure that the syntax I’m using in our branch will match the syntax
we add to Spark later. I’m basing this proposed syntax on PostgreSQL
<https://www.postgresql.org/docs/current/static/ddl-alter.html>.

   - *Update data type*: ALTER TABLE tableIdentifier ALTER COLUMN
   qualifiedName TYPE dataType.
   - *Rename column*: ALTER TABLE tableIdentifier RENAME COLUMN
   qualifiedName TO qualifiedName
   - *Drop column*: ALTER TABLE tableIdentifier DROP (COLUMN | COLUMNS)
   qualifiedNameList

A few notes:

   - Using qualifiedName in these rules allows updating nested types, like
   point.x.
   - Updates and renames can only alter one column, but drop can drop a
   list.
   - Rename can’t move types and will validate that if the TO name is
   qualified, that the prefix matches the original field.
   - I’m also changing ADD COLUMN to support adding fields to nested
   columns by using qualifiedName instead of identifier.

Please reply to this thread if you have suggestions based on a different
SQL engine or want this syntax to be different for another reason. Thanks!

rb
-- 
Ryan Blue
Software Engineer
Netflix


Re: Data source V2 in spark 2.4.0

2018-10-01 Thread Ryan Blue
Hi Assaf,
The major changes to the V2 API that you linked to aren’t going into 2.4.
Those will be in the next release because they weren’t finished in time for
2.4.

Here are the major updates that will be in 2.4:

   - SPARK-23323 <https://issues.apache.org/jira/browse/SPARK-23323>: The
   output commit coordinator is used by default to ensure only one attempt of
   each task commits.
   - SPARK-23325 <https://issues.apache.org/jira/browse/SPARK-23325> and
   SPARK-24971 <https://issues.apache.org/jira/browse/SPARK-24971>: Readers
   should always produce InternalRow instead of Row or UnsafeRow; see
   SPARK-23325 for detail.
   - SPARK-24990 <https://issues.apache.org/jira/browse/SPARK-24990>:
   ReadSupportWithSchema was removed, the user-supplied schema option was
   added to ReadSupport.
   - SPARK-24073 <https://issues.apache.org/jira/browse/SPARK-24073>: Read
   splits are now called InputPartition and a few methods were also renamed
   for clarity.
   - SPARK-25127 <https://issues.apache.org/jira/browse/SPARK-25127>:
   SupportsPushDownCatalystFilters was removed because it leaked Expression in
   the public API. V2 always uses the Filter API now.
   - SPARK-24478 <https://issues.apache.org/jira/browse/SPARK-24478>: Push
   down is now done when converting the a physical plan.

I think there are also quite a few updates for the streaming side, but I’m
not as familiar with those so I’ll let someone else jump in with a summary.

rb

On Mon, Oct 1, 2018 at 9:51 AM assaf.mendelson 
wrote:

> Hi all,
> I understood from previous threads that the Data source V2 API will see
> some
> changes in spark 2.4.0, however, I can't seem to find what these changes
> are.
>
> Is there some documentation which summarizes the changes?
>
> The only mention I seem to find is this pull request:
> https://github.com/apache/spark/pull/22009. Is this all of it?
>
> Thanks,
> Assaf.
>
>
>
> --
> Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
>
> -----
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>

-- 
Ryan Blue
Software Engineer
Netflix


Re: [DISCUSS] Syntax for table DDL

2018-10-01 Thread Ryan Blue
What do you mean by consistent with the syntax in SqlBase.g4? These aren’t
currently defined, so we need to decide what syntax to support. There are
more details below, but the syntax I’m proposing is more standard across
databases than Hive, which uses confusing and non-standard syntax.

I doubt that we want to support Hive syntax for a few reasons. Hive uses
the same column CHANGE statement for multiple purposes, so it ends up with
strange patterns for simple tasks, like updating the column’s type:

ALTER TABLE t CHANGE a1 a1 INT;

The column name is doubled because old name, new name, and type are always
required. So you have to know the type of a column to change its name and
you have to double up the name to change its type. Hive also allows a
couple other oddities:

   - Column reordering with FIRST and AFTER keywords. Column reordering is
   tricky to get right so I’m not sure we want to add it.
   - RESTRICT and CASCADE to signal whether to change all partitions or
   not. Spark doesn’t support partition-level schemas except through Hive, and
   even then I’m not sure how reliable it is.

I know that we wouldn’t necessarily have to support these features from
Hive, but I’m pointing them out to ask the question: why copy Hive’s syntax
if it is unlikely that Spark will implement all of the “features”? I’d
rather go with SQL syntax from databases like PostgreSQL or others that are
more standard and common.

The more “standard” versions of these statements are like what I’ve
proposed:

   - ALTER TABLE ident ALTER COLUMN qualifiedName TYPE dataType: ALTER is
   used by SQL Server, Access, DB2, and PostgreSQL; MODIFY by MySQL and
   Oracle. COLUMN is optional in Oracle and TYPE is omitted by databases
   other than PosgreSQL. I think we could easily add MODIFY as an
   alternative to the second ALTER (and maybe alternatives like UPDATE and
   CHANGE) and make both TYPE and COLUMN optional.
   - ALTER TABLE ident RENAME COLUMN qualifiedName TO qualifiedName: This
   syntax is supported by PostgreSQL, Oracle, and DB2. MySQL uses the same
   syntax as Hive and it appears that SQL server doesn’t have this statement.
   This also match the table rename syntax, which uses TO.
   - ALTER TABLE ident DROP (COLUMN | COLUMNS) qualifiedNameList: This
   matches PostgreSQL, Oracle, DB2, and SQL server. MySQL makes COLUMN
   optional. Most don’t allow deleting multiple columns, but it’s a reasonable
   extension.

While we’re on the subject of ALTER TABLE DDL, I should note that all of
the databases use ADD COLUMN syntax that differs from Hive (and currently,
Spark):

   - ALTER TABLE ident ADD COLUMN qualifiedName dataType (',' qualifiedName
   dataType)*: All other databases I looked at use ADD COLUMN, but not all
   of them support adding multiple columns at the same time. Hive requires (
   and ) enclosing the columns and uses the COLUMNS keyword instead of
   COLUMN. I think that Spark should be updated to make the parens optional
   and to support both keywords, COLUMN and COLUMNS.

What does everyone think? Is it reasonable to use the more standard syntax
instead of using Hive as a base?

rb

On Fri, Sep 28, 2018 at 11:07 PM Xiao Li  wrote:

> Are they consistent with the current syntax defined in SqlBase.g4? I think
> we are following the Hive DDL syntax:
> https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-AlterTable/Partition/Column
>
> Ryan Blue  于2018年9月28日周五 下午3:47写道:
>
>> Hi everyone,
>>
>> I’m currently working on new table DDL statements for v2 tables. For
>> context, the new logical plans for DataSourceV2 require a catalog interface
>> so that Spark can create tables for operations like CTAS. The proposed
>> TableCatalog API also includes an API for altering those tables so we can
>> make ALTER TABLE statements work. I’m implementing those DDL statements,
>> which will make it into upstream Spark when the TableCatalog PR is merged.
>>
>> Since I’m adding new SQL statements that don’t yet exist in Spark, I want
>> to make sure that the syntax I’m using in our branch will match the syntax
>> we add to Spark later. I’m basing this proposed syntax on PostgreSQL
>> <https://www.postgresql.org/docs/current/static/ddl-alter.html>.
>>
>>- *Update data type*: ALTER TABLE tableIdentifier ALTER COLUMN
>>qualifiedName TYPE dataType.
>>- *Rename column*: ALTER TABLE tableIdentifier RENAME COLUMN
>>qualifiedName TO qualifiedName
>>- *Drop column*: ALTER TABLE tableIdentifier DROP (COLUMN | COLUMNS)
>>qualifiedNameList
>>
>> A few notes:
>>
>>- Using qualifiedName in these rules allows updating nested types,
>>like point.x.
>>- Updates and renames can only alter one column, but drop can drop a
>>list.
>>- Rename can’t move types and will

Re: [DISCUSS] Syntax for table DDL

2018-10-02 Thread Ryan Blue
I'd say that it was important to be compatible with Hive in the past, but
that's becoming less important over time. Spark is well established with
Hadoop users and I think the focus moving forward should be to make Spark
more predictable as a SQL engine for people coming from more traditional
databases..

That said, I think there is no problem supporting the alter syntax for both
Hive/MySQL and the more standard versions.

On Tue, Oct 2, 2018 at 8:35 AM Felix Cheung 
wrote:

> I think it has been an important “selling point” that Spark is “mostly
> compatible“ with Hive DDL.
>
> I have see a lot of teams suffering from switching between Presto and Hive
> dialects.
>
> So one question I have is, we are at a point of switch from Hive
> compatible to ANSI SQL, say?
>
> Perhaps a more critical question, what does it take to get the platform to
> support both, by making the ANTLR extensible?
>
>
>
> --
> *From:* Alessandro Solimando 
> *Sent:* Tuesday, October 2, 2018 12:35 AM
> *To:* rb...@netflix.com
> *Cc:* Xiao Li; dev
> *Subject:* Re: [DISCUSS] Syntax for table DDL
>
> I agree with Ryan, a "standard" and more widely adopted syntax is usually
> a good idea, with possibly some slight improvements like "bulk deletion" of
> columns (especially because both the syntax and the semantics are clear),
> rather than stay with Hive syntax at any cost.
>
> I am personally following this PR with a lot of interest, thanks for all
> the work along this direction.
>
> Best regards,
> Alessandro
>
> On Mon, 1 Oct 2018 at 20:21, Ryan Blue  wrote:
>
>> What do you mean by consistent with the syntax in SqlBase.g4? These
>> aren’t currently defined, so we need to decide what syntax to support.
>> There are more details below, but the syntax I’m proposing is more standard
>> across databases than Hive, which uses confusing and non-standard syntax.
>>
>> I doubt that we want to support Hive syntax for a few reasons. Hive uses
>> the same column CHANGE statement for multiple purposes, so it ends up
>> with strange patterns for simple tasks, like updating the column’s type:
>>
>> ALTER TABLE t CHANGE a1 a1 INT;
>>
>> The column name is doubled because old name, new name, and type are
>> always required. So you have to know the type of a column to change its
>> name and you have to double up the name to change its type. Hive also
>> allows a couple other oddities:
>>
>>- Column reordering with FIRST and AFTER keywords. Column reordering
>>is tricky to get right so I’m not sure we want to add it.
>>- RESTRICT and CASCADE to signal whether to change all partitions or
>>not. Spark doesn’t support partition-level schemas except through Hive, 
>> and
>>even then I’m not sure how reliable it is.
>>
>> I know that we wouldn’t necessarily have to support these features from
>> Hive, but I’m pointing them out to ask the question: why copy Hive’s syntax
>> if it is unlikely that Spark will implement all of the “features”? I’d
>> rather go with SQL syntax from databases like PostgreSQL or others that are
>> more standard and common.
>>
>> The more “standard” versions of these statements are like what I’ve
>> proposed:
>>
>>- ALTER TABLE ident ALTER COLUMN qualifiedName TYPE dataType: ALTER
>>is used by SQL Server, Access, DB2, and PostgreSQL; MODIFY by MySQL
>>and Oracle. COLUMN is optional in Oracle and TYPE is omitted by
>>databases other than PosgreSQL. I think we could easily add MODIFY as
>>an alternative to the second ALTER (and maybe alternatives like UPDATE
>>and CHANGE) and make both TYPE and COLUMN optional.
>>- ALTER TABLE ident RENAME COLUMN qualifiedName TO qualifiedName:
>>This syntax is supported by PostgreSQL, Oracle, and DB2. MySQL uses the
>>same syntax as Hive and it appears that SQL server doesn’t have this
>>statement. This also match the table rename syntax, which uses TO.
>>- ALTER TABLE ident DROP (COLUMN | COLUMNS) qualifiedNameList: This
>>matches PostgreSQL, Oracle, DB2, and SQL server. MySQL makes COLUMN
>>optional. Most don’t allow deleting multiple columns, but it’s a 
>> reasonable
>>extension.
>>
>> While we’re on the subject of ALTER TABLE DDL, I should note that all of
>> the databases use ADD COLUMN syntax that differs from Hive (and
>> currently, Spark):
>>
>>- ALTER TABLE ident ADD COLUMN qualifiedName dataType (','
>>qualifiedName dataType)*: All other databases I looked at use ADD
>>COLUMN, but not all of them support adding m

Re: [DISCUSS] Syntax for table DDL

2018-10-04 Thread Ryan Blue
Sounds good. I'll plan on adding a PR with Hive's CHANGE syntax in addition
to what I've proposed here.

I have all of these working in our Spark distribution, so I'm just waiting
on finalizing the TableCatalog API to submit these upstream.

On Wed, Oct 3, 2018 at 10:07 PM Wenchen Fan  wrote:

> Thank you Ryan for proposing the DDL syntax! I think it's good to follow
> mainstream databases, and the proposed syntax looks very reasonable.
>
> About Hive compatibility, I think it's not that important now, but it's
> still good if we keep it. Shall we support the Hive syntax as an
> alternative? It seems not very hard, just a few more ANTLR rules. It will
> be better if we can make ANTLR extensible and allow other data sources to
> define custom SQL syntax.
>
> Anyway I think they are orthogonal. We can go ahead with the proposed
> syntax here, and add Hive compatible syntax later.
>
> On Tue, Oct 2, 2018 at 11:50 PM Ryan Blue 
> wrote:
>
>> I'd say that it was important to be compatible with Hive in the past, but
>> that's becoming less important over time. Spark is well established with
>> Hadoop users and I think the focus moving forward should be to make Spark
>> more predictable as a SQL engine for people coming from more traditional
>> databases..
>>
>> That said, I think there is no problem supporting the alter syntax for
>> both Hive/MySQL and the more standard versions.
>>
>> On Tue, Oct 2, 2018 at 8:35 AM Felix Cheung 
>> wrote:
>>
>>> I think it has been an important “selling point” that Spark is “mostly
>>> compatible“ with Hive DDL.
>>>
>>> I have see a lot of teams suffering from switching between Presto and
>>> Hive dialects.
>>>
>>> So one question I have is, we are at a point of switch from Hive
>>> compatible to ANSI SQL, say?
>>>
>>> Perhaps a more critical question, what does it take to get the platform
>>> to support both, by making the ANTLR extensible?
>>>
>>>
>>>
>>> --
>>> *From:* Alessandro Solimando 
>>> *Sent:* Tuesday, October 2, 2018 12:35 AM
>>> *To:* rb...@netflix.com
>>> *Cc:* Xiao Li; dev
>>> *Subject:* Re: [DISCUSS] Syntax for table DDL
>>>
>>> I agree with Ryan, a "standard" and more widely adopted syntax is
>>> usually a good idea, with possibly some slight improvements like "bulk
>>> deletion" of columns (especially because both the syntax and the semantics
>>> are clear), rather than stay with Hive syntax at any cost.
>>>
>>> I am personally following this PR with a lot of interest, thanks for all
>>> the work along this direction.
>>>
>>> Best regards,
>>> Alessandro
>>>
>>> On Mon, 1 Oct 2018 at 20:21, Ryan Blue 
>>> wrote:
>>>
>>>> What do you mean by consistent with the syntax in SqlBase.g4? These
>>>> aren’t currently defined, so we need to decide what syntax to support.
>>>> There are more details below, but the syntax I’m proposing is more standard
>>>> across databases than Hive, which uses confusing and non-standard syntax.
>>>>
>>>> I doubt that we want to support Hive syntax for a few reasons. Hive
>>>> uses the same column CHANGE statement for multiple purposes, so it
>>>> ends up with strange patterns for simple tasks, like updating the column’s
>>>> type:
>>>>
>>>> ALTER TABLE t CHANGE a1 a1 INT;
>>>>
>>>> The column name is doubled because old name, new name, and type are
>>>> always required. So you have to know the type of a column to change its
>>>> name and you have to double up the name to change its type. Hive also
>>>> allows a couple other oddities:
>>>>
>>>>- Column reordering with FIRST and AFTER keywords. Column
>>>>reordering is tricky to get right so I’m not sure we want to add it.
>>>>- RESTRICT and CASCADE to signal whether to change all partitions
>>>>or not. Spark doesn’t support partition-level schemas except through 
>>>> Hive,
>>>>and even then I’m not sure how reliable it is.
>>>>
>>>> I know that we wouldn’t necessarily have to support these features from
>>>> Hive, but I’m pointing them out to ask the question: why copy Hive’s syntax
>>>> if it is unlikely that Spark will implement all of the “features”? I’d
>>>> rather go with SQL syntax from databases li

Spark SQL parser and DDL

2018-10-04 Thread Ryan Blue
Hi everyone,

I’ve been working on SQL DDL statements for v2 tables lately, including the
proposed additions to drop, rename, and alter columns. The most recent
update I’ve added is to allow transformation functions in the PARTITION BY
clause to pass to v2 data sources. This allows sources like Iceberg to do
partition pruning internally.

One of the difficulties has been that the SQL parser is coupled to the
current logical plans and includes details that are specific to them. For
example, data source table creation makes determinations like the EXTERNAL
keyword is not allowed and instead the mode (external or managed) is set
depending on whether a path is set. It also translates IF NOT EXISTS into a
SaveMode and introduces a few other transformations.

The main problem with this is that converting the SQL plans produced by the
parser to v2 plans requires interpreting these alterations and not the
original SQL. Another consequence is that there are two parsers: AstBuilder
in spark-catalyst and SparkSqlParser in spark-sql (core) because not all of
the plans are available to the parser in the catalyst module.

I think it would be cleaner if we added a sql package with catalyst plans
that carry the SQL options as they were parsed, and then convert those
plans to specific implementations depending on the tables that are used.
That makes support for v2 plans much cleaner by converting from a generic
SQL plan instead of creating a v1 plan that assumes a data source table and
then converting that to a v2 plan (playing telephone with logical plans).

This has simplified the work I’ve been doing to add PARTITION BY
transformations. Instead of needing to add transformations to the
CatalogTable metadata that’s used everywhere, this only required a change
to the rule that converts from the parsed SQL plan to CatalogTable-based v1
plans. It is also cleaner to have the logic for converting to CatalogTable
in DataSourceAnalysis instead of in the parser itself.

Are there objections to this approach for integrating v2 plans?
-- 
Ryan Blue
Software Engineer
Netflix


Re: Data source V2 in spark 2.4.0

2018-10-04 Thread Ryan Blue
Assaf, thanks for the feedback.

The InternalRow issue is one we know about. If it helps, I wrote up some docs
for InternalRow
<https://github.com/apache/spark/blob/64da2971a1f083926df35fe1366bcba84d97c7b7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/data/package.scala>
as
part of SPARK-23657 <https://issues.apache.org/jira/browse/SPARK-23657>. It
may be a good idea to make it easier for people to produce InternalRow, but
we want to be careful not to mislead people down paths that have bad
performance. InternalRow is what Spark will use directly for filters and we
don't want to do too much conversion. We shouldn't make it deceptively easy
to work with Row instead of InternalRow because Row is going to be slower.

For bad rows, I would suggest using a filtered iterator
<https://google.github.io/guava/releases/21.0/api/docs/com/google/common/collect/Iterables.html#filter-java.lang.Iterable-com.google.common.base.Predicate->
to solve your problem. How to handle invalid rows isn't really a concern
Spark should handle. Using a filtered iterator would give you the
hasNext/next methods you're looking for to implement Spark's API.

Metrics are something that we still need to add. I think that Spark should
handle record count and FS bytes read metrics like it does for other
sources (I've been meaning to contribute an implementation for DSv2). Bad
records may be a good candidate for requesting accumulators in the v2 API.

rb

On Thu, Oct 4, 2018 at 11:32 AM assaf.mendelson 
wrote:

> Thanks for the info.
>
> I have been converting an internal data source to V2 and am now preparing
> it
> for 2.4.0.
>
> I have a couple of suggestions from my experience so far.
>
> First I believe we are missing documentation on this. I am currently
> writing
> an internal tutorial based on what I am learning, I would be happy to share
> it once it gets a little better (not sure where it should go though).
>
>
>
> The change from using Row to using InternalRow is a little confusing.
> For generic row we can do Row.fromSeq(values) where values are regular java
> types (matching the schema). This even includes more complex types like
> Array[String] and everything just works.
>
> For IntrenalRow, this doesn't work for non trivial types. I figured out how
> to convert strings and timestamps (hopefully I even did it correctly)  but
> I
> couldn't figure Array[String].
>
> Beyond the fact that I would love to learn how to do the conversion
> correctly for various types (such as array), I would suggest we should add
> some method to create the internal row from base types. In the 2.3.0
> version, the row we got from Get would be encoded via an encoder which was
> provided. I managed to get it to work by doing:
>
> val encoder = RowEncoder.apply(schema).resolveAndBind() in the constructor
> and then encoder.toRow(Row.fromSeq(values))
>
> this simply feels a little weird to me.
>
>
> Another issue that I encountered is handling bad data. In our legacy source
> we have cases where a specific row is bad. What we would do in non spark
> code is simply skip it.
>
> The problem is that in spark, if we put next to be true we must have some
> row for the get function. This means we always need to read records ahead
> to
> figure out if we actually ha something or not.
>
> Might we instead be allowed to return null from get in which case the line
> would just be skipped?
>
>
> Lastly I would be happy for a means to return metrics from the reading (how
> many records we read, how many bad records we have). Perhaps by allowing to
> use accumulators in the data source?
>
> Sorry for the long winded message, I will probably have more as I continue
> to explore this.
>
> Thanks,
>Assaf.
>
>
>
>
>
> --
> Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>

-- 
Ryan Blue
Software Engineer
Netflix


Re: DataSourceV2 APIs creating multiple instances of DataSourceReader and hence not preserving the state

2018-10-19 Thread Ryan Blue
>> MyDataSourceReader@3095c449 are being created. Consequently schema is
>> null in MyDataSourceReader@3095c449.
>>
>> Am I not doing it the correct way?
>>
>> Thanks,
>> Shubham
>>
>> On Tue, Oct 9, 2018 at 4:43 PM Mendelson, Assaf 
>> wrote:
>>
>>> I am using v2.4.0-RC2
>>>
>>>
>>>
>>> The code as is wouldn’t run (e.g. planBatchInputPartitions returns
>>> null). How are you calling it?
>>>
>>>
>>>
>>> When I do:
>>>
>>> Val df = spark.read.format(mypackage).load().show()
>>>
>>> I am getting a single creation, how are you creating the reader?
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Assaf
>>>
>>>
>>>
>>> *From:* Shubham Chaurasia [mailto:shubh.chaura...@gmail.com]
>>> *Sent:* Tuesday, October 9, 2018 2:02 PM
>>> *To:* Mendelson, Assaf; u...@spark.apache.org
>>> *Subject:* Re: DataSourceV2 APIs creating multiple instances of
>>> DataSourceReader and hence not preserving the state
>>>
>>>
>>>
>>> [EXTERNAL EMAIL]
>>> Please report any suspicious attachments, links, or requests for
>>> sensitive information.
>>>
>>> Thanks Assaf, you tried with *tags/v2.4.0-rc2?*
>>>
>>>
>>>
>>> Full Code:
>>>
>>>
>>>
>>> MyDataSource is the entry point which simply creates Reader and Writer
>>>
>>>
>>>
>>> public class MyDataSource implements DataSourceV2, WriteSupport,
>>> ReadSupport, SessionConfigSupport {
>>>
>>>
>>>
>>>   @Override public DataSourceReader createReader(DataSourceOptions
>>> options) {
>>>
>>> return new MyDataSourceReader(options.asMap());
>>>
>>>   }
>>>
>>>
>>>
>>>   @Override
>>>
>>>   public Optional createWriter(String jobId,
>>> StructType schema,
>>>
>>>   SaveMode mode, DataSourceOptions options) {
>>>
>>> // creates a dataSourcewriter here..
>>>
>>> return Optional.of(dataSourcewriter);
>>>
>>>   }
>>>
>>>
>>>
>>>   @Override public String keyPrefix() {
>>>
>>> return "myprefix";
>>>
>>>   }
>>>
>>>
>>>
>>> }
>>>
>>>
>>>
>>> public class MyDataSourceReader implements DataSourceReader,
>>> SupportsScanColumnarBatch {
>>>
>>>
>>>
>>>   StructType schema = null;
>>>
>>>   Map options;
>>>
>>>
>>>
>>>   public MyDataSourceReader(Map options) {
>>>
>>> System.out.println("MyDataSourceReader.MyDataSourceReader:
>>> Instantiated" + this);
>>>
>>> this.options = options;
>>>
>>>   }
>>>
>>>
>>>
>>>   @Override
>>>
>>>   public List> planBatchInputPartitions() {
>>>
>>> //variable this.schema is null here since readSchema() was called on
>>> a different instance
>>>
>>> System.out.println("MyDataSourceReader.planBatchInputPartitions: " +
>>> this + " schema: " + this.schema);
>>>
>>> //more logic..
>>>
>>> return null;
>>>
>>>   }
>>>
>>>
>>>
>>>   @Override
>>>
>>>   public StructType readSchema() {
>>>
>>> //some logic to discover schema
>>>
>>> this.schema = (new StructType())
>>>
>>> .add("col1", "int")
>>>
>>> .add("col2", "string");
>>>
>>> System.out.println("MyDataSourceReader.readSchema: " + this + "
>>> schema: " + this.schema);
>>>
>>> return this.schema;
>>>
>>>   }
>>>
>>> }
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Shubham
>>>
>>>
>>>
>>> On Tue, Oct 9, 2018 at 3:59 PM Mendelson, Assaf 
>>> wrote:
>>>
>>> Could you add a fuller code example? I tried to reproduce it in my
>>> environment and I am getting just one instance of the reader…
>>>
>>>
>>>
>>> Thanks,
>>>
>>> 

Re: [VOTE] SPARK 2.4.0 (RC4)

2018-10-23 Thread Ryan Blue
+1 (non-binding)

The Iceberg implementation of DataSourceV2 is passing all tests after
updating to the 2.4 API, although I've had to disable ORC support because
BufferHolder is no longer public.

One oddity is that the DSv2 API for batch sources now includes an epoch ID,
which I think will be removed in the refactor before 2.5 or 3.0 and wasn't
part of the 2.3 release. That's strange, but it's minor.

rb

On Tue, Oct 23, 2018 at 5:10 PM Sean Owen  wrote:

> Hm, so you're trying to build a source release from a binary release?
> I don't think that needs to work nor do I expect it to for reasons
> like this. They just have fairly different things.
>
> On Tue, Oct 23, 2018 at 7:04 PM Dongjoon Hyun 
> wrote:
> >
> > Ur, Wenchen.
> >
> > Source distribution seems to fail by default.
> >
> >
> https://dist.apache.org/repos/dist/dev/spark/v2.4.0-rc4-bin/spark-2.4.0.tgz
> >
> > $ dev/make-distribution.sh -Pyarn -Phadoop-2.7 -Pkinesis-asl -Phive
> -Phive-thriftserver
> > ...
> > + cp /spark-2.4.0/LICENSE-binary /spark-2.4.0/dist/LICENSE
> > cp: /spark-2.4.0/LICENSE-binary: No such file or directory
> >
> >
> > The root cause seems to be the following fix.
> >
> >
> https://github.com/apache/spark/pull/22436/files#diff-01ca42240614718522afde4d4885b40dR175
> >
> > Although Apache Spark provides the binary distributions, it would be
> great if this succeeds out of the box.
> >
> > Bests,
> > Dongjoon.
> >
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>

-- 
Ryan Blue
Software Engineer
Netflix


DataSourceV2 hangouts sync

2018-10-25 Thread Ryan Blue
Hi everyone,

There's been some great discussion for DataSourceV2 in the last few months,
but it has been difficult to resolve some of the discussions and I don't
think that we have a very clear roadmap for getting the work done.

To coordinate better as a community, I'd like to start a regular sync-up
over google hangouts. We use this in the Parquet community to have more
effective community discussions about thorny technical issues and to get
aligned on an overall roadmap. It is really helpful in that community and I
think it would help us get DSv2 done more quickly.

Here's how it works: people join the hangout, we go around the list to
gather topics, have about an hour-long discussion, and then send a summary
of the discussion to the dev list for anyone that couldn't participate.
That way we can move topics along, but we keep the broader community in the
loop as well for further discussion on the mailing list.

I'll volunteer to set up the sync and send invites to anyone that wants to
attend. If you're interested, please reply with the email address you'd
like to put on the invite list (if there's a way to do this without
specific invites, let me know). Also for the first sync, please note what
times would work for you so we can try to account for people in different
time zones.

For the first one, I was thinking some day next week (time TBD by those
interested) and starting off with a general roadmap discussion before
diving into specific technical topics.

Thanks,

rb

-- 
Ryan Blue
Software Engineer
Netflix


Re: DataSourceV2 hangouts sync

2018-10-25 Thread Ryan Blue
Since not many people have replied with a time window, how about we aim for
5PM PDT? That should work for Wenchen and most people here in the bay area.

If that makes it so some people can't attend, we can do the next one
earlier for people in Europe.

If we go with 5PM PDT, then what day works best for everyone?

On Thu, Oct 25, 2018 at 5:01 PM Wenchen Fan  wrote:

> Big +1 on this!
>
> I live in UTC+8 and I'm available from 8 am, which is 5 pm in the bay
> area. Hopefully we can coordinate a time that fits everyone.
>
> Thanks
> Wenchen
>
>
>
> On Fri, Oct 26, 2018 at 7:21 AM Dongjoon Hyun 
> wrote:
>
>> +1. Thank you for volunteering, Ryan!
>>
>> Bests,
>> Dongjoon.
>>
>>
>> On Thu, Oct 25, 2018 at 4:19 PM Xiao Li  wrote:
>>
>>> +1
>>>
>>> Reynold Xin  于2018年10月25日周四 下午4:16写道:
>>>
>>>> +1
>>>>
>>>>
>>>>
>>>> On Thu, Oct 25, 2018 at 4:12 PM Li Jin  wrote:
>>>>
>>>>> Although I am not specifically involved in DSv2, I think having this
>>>>> kind of meeting is definitely helpful to discuss, move certain effort
>>>>> forward and keep people on the same page. Glad to see this kind of working
>>>>> group happening.
>>>>>
>>>>> On Thu, Oct 25, 2018 at 5:58 PM John Zhuge  wrote:
>>>>>
>>>>>> Great idea!
>>>>>>
>>>>>> On Thu, Oct 25, 2018 at 1:10 PM Ryan Blue 
>>>>>> wrote:
>>>>>>
>>>>>>> Hi everyone,
>>>>>>>
>>>>>>> There's been some great discussion for DataSourceV2 in the last few
>>>>>>> months, but it has been difficult to resolve some of the discussions 
>>>>>>> and I
>>>>>>> don't think that we have a very clear roadmap for getting the work done.
>>>>>>>
>>>>>>> To coordinate better as a community, I'd like to start a regular
>>>>>>> sync-up over google hangouts. We use this in the Parquet community to 
>>>>>>> have
>>>>>>> more effective community discussions about thorny technical issues and 
>>>>>>> to
>>>>>>> get aligned on an overall roadmap. It is really helpful in that 
>>>>>>> community
>>>>>>> and I think it would help us get DSv2 done more quickly.
>>>>>>>
>>>>>>> Here's how it works: people join the hangout, we go around the list
>>>>>>> to gather topics, have about an hour-long discussion, and then send a
>>>>>>> summary of the discussion to the dev list for anyone that couldn't
>>>>>>> participate. That way we can move topics along, but we keep the broader
>>>>>>> community in the loop as well for further discussion on the mailing 
>>>>>>> list.
>>>>>>>
>>>>>>> I'll volunteer to set up the sync and send invites to anyone that
>>>>>>> wants to attend. If you're interested, please reply with the email 
>>>>>>> address
>>>>>>> you'd like to put on the invite list (if there's a way to do this 
>>>>>>> without
>>>>>>> specific invites, let me know). Also for the first sync, please note 
>>>>>>> what
>>>>>>> times would work for you so we can try to account for people in 
>>>>>>> different
>>>>>>> time zones.
>>>>>>>
>>>>>>> For the first one, I was thinking some day next week (time TBD by
>>>>>>> those interested) and starting off with a general roadmap discussion 
>>>>>>> before
>>>>>>> diving into specific technical topics.
>>>>>>>
>>>>>>> Thanks,
>>>>>>>
>>>>>>> rb
>>>>>>>
>>>>>>> --
>>>>>>> Ryan Blue
>>>>>>> Software Engineer
>>>>>>> Netflix
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> John Zhuge
>>>>>>
>>>>>

-- 
Ryan Blue
Software Engineer
Netflix


Re: DataSourceV2 hangouts sync

2018-10-25 Thread Ryan Blue
Good point. How about Monday or Wednesday at 5PM PDT then?

Everyone, please reply to me (no need to spam the list) with which option
works for you and I'll send an invite for the one with the most votes.

On Thu, Oct 25, 2018 at 5:14 PM Wenchen Fan  wrote:

> Friday at the bay area is Saturday at my side, it will be great if we can
> pick a day from Monday to Thursday.
>
> On Fri, Oct 26, 2018 at 8:08 AM Ryan Blue  wrote:
>
>> Since not many people have replied with a time window, how about we aim
>> for 5PM PDT? That should work for Wenchen and most people here in the bay
>> area.
>>
>> If that makes it so some people can't attend, we can do the next one
>> earlier for people in Europe.
>>
>> If we go with 5PM PDT, then what day works best for everyone?
>>
>> On Thu, Oct 25, 2018 at 5:01 PM Wenchen Fan  wrote:
>>
>>> Big +1 on this!
>>>
>>> I live in UTC+8 and I'm available from 8 am, which is 5 pm in the bay
>>> area. Hopefully we can coordinate a time that fits everyone.
>>>
>>> Thanks
>>> Wenchen
>>>
>>>
>>>
>>> On Fri, Oct 26, 2018 at 7:21 AM Dongjoon Hyun 
>>> wrote:
>>>
>>>> +1. Thank you for volunteering, Ryan!
>>>>
>>>> Bests,
>>>> Dongjoon.
>>>>
>>>>
>>>> On Thu, Oct 25, 2018 at 4:19 PM Xiao Li  wrote:
>>>>
>>>>> +1
>>>>>
>>>>> Reynold Xin  于2018年10月25日周四 下午4:16写道:
>>>>>
>>>>>> +1
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Oct 25, 2018 at 4:12 PM Li Jin  wrote:
>>>>>>
>>>>>>> Although I am not specifically involved in DSv2, I think having this
>>>>>>> kind of meeting is definitely helpful to discuss, move certain effort
>>>>>>> forward and keep people on the same page. Glad to see this kind of 
>>>>>>> working
>>>>>>> group happening.
>>>>>>>
>>>>>>> On Thu, Oct 25, 2018 at 5:58 PM John Zhuge 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Great idea!
>>>>>>>>
>>>>>>>> On Thu, Oct 25, 2018 at 1:10 PM Ryan Blue 
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi everyone,
>>>>>>>>>
>>>>>>>>> There's been some great discussion for DataSourceV2 in the last
>>>>>>>>> few months, but it has been difficult to resolve some of the 
>>>>>>>>> discussions
>>>>>>>>> and I don't think that we have a very clear roadmap for getting the 
>>>>>>>>> work
>>>>>>>>> done.
>>>>>>>>>
>>>>>>>>> To coordinate better as a community, I'd like to start a regular
>>>>>>>>> sync-up over google hangouts. We use this in the Parquet community to 
>>>>>>>>> have
>>>>>>>>> more effective community discussions about thorny technical issues 
>>>>>>>>> and to
>>>>>>>>> get aligned on an overall roadmap. It is really helpful in that 
>>>>>>>>> community
>>>>>>>>> and I think it would help us get DSv2 done more quickly.
>>>>>>>>>
>>>>>>>>> Here's how it works: people join the hangout, we go around the
>>>>>>>>> list to gather topics, have about an hour-long discussion, and then 
>>>>>>>>> send a
>>>>>>>>> summary of the discussion to the dev list for anyone that couldn't
>>>>>>>>> participate. That way we can move topics along, but we keep the 
>>>>>>>>> broader
>>>>>>>>> community in the loop as well for further discussion on the mailing 
>>>>>>>>> list.
>>>>>>>>>
>>>>>>>>> I'll volunteer to set up the sync and send invites to anyone that
>>>>>>>>> wants to attend. If you're interested, please reply with the email 
>>>>>>>>> address
>>>>>>>>> you'd like to put on the invite list (if there's a way to do this 
>>>>>>>>> without
>>>>>>>>> specific invites, let me know). Also for the first sync, please note 
>>>>>>>>> what
>>>>>>>>> times would work for you so we can try to account for people in 
>>>>>>>>> different
>>>>>>>>> time zones.
>>>>>>>>>
>>>>>>>>> For the first one, I was thinking some day next week (time TBD by
>>>>>>>>> those interested) and starting off with a general roadmap discussion 
>>>>>>>>> before
>>>>>>>>> diving into specific technical topics.
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>>
>>>>>>>>> rb
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Ryan Blue
>>>>>>>>> Software Engineer
>>>>>>>>> Netflix
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> John Zhuge
>>>>>>>>
>>>>>>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>

-- 
Ryan Blue
Software Engineer
Netflix


Re: DataSourceV2 hangouts sync

2018-10-26 Thread Ryan Blue
Looks like the majority opinion is for Wednesday. I've sent out an invite
to everyone that replied and will add more people as I hear more responses.

Thanks, everyone!

On Fri, Oct 26, 2018 at 3:23 AM Gengliang Wang  wrote:

> +1
>
> On Oct 26, 2018, at 8:45 AM, Hyukjin Kwon  wrote:
>
> I didn't know I live in the same timezone with you Wenchen :D.
> Monday or Wednesday at 5PM PDT sounds good to me too FWIW.
>
> 2018년 10월 26일 (금) 오전 8:29, Ryan Blue 님이 작성:
>
>> Good point. How about Monday or Wednesday at 5PM PDT then?
>>
>> Everyone, please reply to me (no need to spam the list) with which option
>> works for you and I'll send an invite for the one with the most votes.
>>
>> On Thu, Oct 25, 2018 at 5:14 PM Wenchen Fan  wrote:
>>
>>> Friday at the bay area is Saturday at my side, it will be great if we
>>> can pick a day from Monday to Thursday.
>>>
>>> On Fri, Oct 26, 2018 at 8:08 AM Ryan Blue  wrote:
>>>
>>>> Since not many people have replied with a time window, how about we aim
>>>> for 5PM PDT? That should work for Wenchen and most people here in the bay
>>>> area.
>>>>
>>>> If that makes it so some people can't attend, we can do the next one
>>>> earlier for people in Europe.
>>>>
>>>> If we go with 5PM PDT, then what day works best for everyone?
>>>>
>>>> On Thu, Oct 25, 2018 at 5:01 PM Wenchen Fan 
>>>> wrote:
>>>>
>>>>> Big +1 on this!
>>>>>
>>>>> I live in UTC+8 and I'm available from 8 am, which is 5 pm in the bay
>>>>> area. Hopefully we can coordinate a time that fits everyone.
>>>>>
>>>>> Thanks
>>>>> Wenchen
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Oct 26, 2018 at 7:21 AM Dongjoon Hyun 
>>>>> wrote:
>>>>>
>>>>>> +1. Thank you for volunteering, Ryan!
>>>>>>
>>>>>> Bests,
>>>>>> Dongjoon.
>>>>>>
>>>>>>
>>>>>> On Thu, Oct 25, 2018 at 4:19 PM Xiao Li  wrote:
>>>>>>
>>>>>>> +1
>>>>>>>
>>>>>>> Reynold Xin  于2018年10月25日周四 下午4:16写道:
>>>>>>>
>>>>>>>> +1
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Oct 25, 2018 at 4:12 PM Li Jin 
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Although I am not specifically involved in DSv2, I think having
>>>>>>>>> this kind of meeting is definitely helpful to discuss, move certain 
>>>>>>>>> effort
>>>>>>>>> forward and keep people on the same page. Glad to see this kind of 
>>>>>>>>> working
>>>>>>>>> group happening.
>>>>>>>>>
>>>>>>>>> On Thu, Oct 25, 2018 at 5:58 PM John Zhuge 
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Great idea!
>>>>>>>>>>
>>>>>>>>>> On Thu, Oct 25, 2018 at 1:10 PM Ryan Blue <
>>>>>>>>>> rb...@netflix.com.invalid> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi everyone,
>>>>>>>>>>>
>>>>>>>>>>> There's been some great discussion for DataSourceV2 in the last
>>>>>>>>>>> few months, but it has been difficult to resolve some of the 
>>>>>>>>>>> discussions
>>>>>>>>>>> and I don't think that we have a very clear roadmap for getting the 
>>>>>>>>>>> work
>>>>>>>>>>> done.
>>>>>>>>>>>
>>>>>>>>>>> To coordinate better as a community, I'd like to start a regular
>>>>>>>>>>> sync-up over google hangouts. We use this in the Parquet community 
>>>>>>>>>>> to have
>>>>>>>>>>> more effective community discussions about thorny technical issues 
>>>>>>>>>>> and to
>>>>>>>>>>> get aligned on an overall roadmap. It is really helpful in that 
>>>>>>>>>>> community
>>>>>>>>>>> and I think it would help us get DSv2 done more quickly.
>>>>>>>>>>>
>>>>>>>>>>> Here's how it works: people join the hangout, we go around the
>>>>>>>>>>> list to gather topics, have about an hour-long discussion, and then 
>>>>>>>>>>> send a
>>>>>>>>>>> summary of the discussion to the dev list for anyone that couldn't
>>>>>>>>>>> participate. That way we can move topics along, but we keep the 
>>>>>>>>>>> broader
>>>>>>>>>>> community in the loop as well for further discussion on the mailing 
>>>>>>>>>>> list.
>>>>>>>>>>>
>>>>>>>>>>> I'll volunteer to set up the sync and send invites to anyone
>>>>>>>>>>> that wants to attend. If you're interested, please reply with the 
>>>>>>>>>>> email
>>>>>>>>>>> address you'd like to put on the invite list (if there's a way to 
>>>>>>>>>>> do this
>>>>>>>>>>> without specific invites, let me know). Also for the first sync, 
>>>>>>>>>>> please
>>>>>>>>>>> note what times would work for you so we can try to account for 
>>>>>>>>>>> people in
>>>>>>>>>>> different time zones.
>>>>>>>>>>>
>>>>>>>>>>> For the first one, I was thinking some day next week (time TBD
>>>>>>>>>>> by those interested) and starting off with a general roadmap 
>>>>>>>>>>> discussion
>>>>>>>>>>> before diving into specific technical topics.
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>>
>>>>>>>>>>> rb
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Ryan Blue
>>>>>>>>>>> Software Engineer
>>>>>>>>>>> Netflix
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> John Zhuge
>>>>>>>>>>
>>>>>>>>>
>>>>
>>>> --
>>>> Ryan Blue
>>>> Software Engineer
>>>> Netflix
>>>>
>>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>
>

-- 
Ryan Blue
Software Engineer
Netflix


Re: DataSourceV2 hangouts sync

2018-10-29 Thread Ryan Blue
Everyone,

There are now 25 guests invited, which is a lot of people to actively
participate in a sync like this.

For those of you who probably won't actively participate, I've added a live
stream. If you don't plan to talk, please use the live stream instead of
the meet/hangout so that we don't end up with so many people that we can't
actually get the discussion going. Here's a link to the stream:

https://stream.meet.google.com/stream/6be59d80-04c7-44dc-9042-4f3b597fc8ba

Thanks!

rb

On Thu, Oct 25, 2018 at 1:09 PM Ryan Blue  wrote:

> Hi everyone,
>
> There's been some great discussion for DataSourceV2 in the last few
> months, but it has been difficult to resolve some of the discussions and I
> don't think that we have a very clear roadmap for getting the work done.
>
> To coordinate better as a community, I'd like to start a regular sync-up
> over google hangouts. We use this in the Parquet community to have more
> effective community discussions about thorny technical issues and to get
> aligned on an overall roadmap. It is really helpful in that community and I
> think it would help us get DSv2 done more quickly.
>
> Here's how it works: people join the hangout, we go around the list to
> gather topics, have about an hour-long discussion, and then send a summary
> of the discussion to the dev list for anyone that couldn't participate.
> That way we can move topics along, but we keep the broader community in the
> loop as well for further discussion on the mailing list.
>
> I'll volunteer to set up the sync and send invites to anyone that wants to
> attend. If you're interested, please reply with the email address you'd
> like to put on the invite list (if there's a way to do this without
> specific invites, let me know). Also for the first sync, please note what
> times would work for you so we can try to account for people in different
> time zones.
>
> For the first one, I was thinking some day next week (time TBD by those
> interested) and starting off with a general roadmap discussion before
> diving into specific technical topics.
>
> Thanks,
>
> rb
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>


-- 
Ryan Blue
Software Engineer
Netflix


Re: [VOTE] SPARK 2.4.0 (RC5)

2018-10-30 Thread Ryan Blue
versions/12342385
>>> >> >>
>>> >> >> FAQ
>>> >> >>
>>> >> >> =
>>> >> >> How can I help test this release?
>>> >> >> =
>>> >> >>
>>> >> >> If you are a Spark user, you can help us test this release by
>>> taking
>>> >> >> an existing Spark workload and running on this release candidate,
>>> then
>>> >> >> reporting any regressions.
>>> >> >>
>>> >> >> If you're working in PySpark you can set up a virtual env and
>>> install
>>> >> >> the current RC and see if anything important breaks, in the
>>> Java/Scala
>>> >> >> you can add the staging repository to your projects resolvers and
>>> test
>>> >> >> with the RC (make sure to clean up the artifact cache before/after
>>> so
>>> >> >> you don't end up building with a out of date RC going forward).
>>> >> >>
>>> >> >> ===
>>> >> >> What should happen to JIRA tickets still targeting 2.4.0?
>>> >> >> ===
>>> >> >>
>>> >> >> The current list of open tickets targeted at 2.4.0 can be found at:
>>> >> >> https://issues.apache.org/jira/projects/SPARK and search for
>>> "Target Version/s" = 2.4.0
>>> >> >>
>>> >> >> Committers should look at those and triage. Extremely important bug
>>> >> >> fixes, documentation, and API tweaks that impact compatibility
>>> should
>>> >> >> be worked on immediately. Everything else please retarget to an
>>> >> >> appropriate release.
>>> >> >>
>>> >> >> ==
>>> >> >> But my bug isn't fixed?
>>> >> >> ==
>>> >> >>
>>> >> >> In order to make timely releases, we will typically not hold the
>>> >> >> release unless the bug in question is a regression from the
>>> previous
>>> >> >> release. That being said, if there is something which is a
>>> regression
>>> >> >> that has not been correctly targeted please ping me or a committer
>>> to
>>> >> >> help target the issue.
>>> >> >
>>> >> >
>>> -
>>> >> > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>> >> >
>>> >>
>>> >>
>>> >> -
>>> >> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>> >>
>>>
>>> -
>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>>
>>>
>>
>> --
>> [image: Spark+AI Summit North America 2019]
>> <http://t.sidekickopen24.com/s1t/c/5/f18dQhb0S7lM8dDMPbW2n0x6l2B9nMJN7t5X-FfhMynN2z8MDjQsyTKW56dzQQ1-_gV6102?t=https%3A%2F%2Fdatabricks.com%2Fsparkaisummit%2Fnorth-america&si=undefined&pi=406b8c9a-b648-4923-9ed1-9a51ffe213fa>
>>
>

-- 
Ryan Blue
Software Engineer
Netflix


Re: DataSourceV2 hangouts sync

2018-11-01 Thread Ryan Blue
Thanks to everyone that attended the sync! We had some good discussions.
Here are my notes for anyone that missed it or couldn’t join the live
stream. If anyone wants to add to this, please send additional thoughts or
corrections.

*Attendees:*

   - Ryan Blue - Netflix - Using v2 to integrate Iceberg with Spark. SQL,
   DDL/schema evolution, delete support, and hidden partitioning working in
   Netflix’s branch.
   - John Zhuge - Netflix - Working on multi-catalog support.
   - Felix Cheung - Uber - Interested in integrating Uber data sources.
   External catalog would be useful
   - Reynold Xin - DataBricks - Working more on SQL and sources
   - Arun M - HortonWorks - Interested in streaming, unified continuous and
   micro-batch modes
   - Dale Richardson - Private developer - Interested in non-FS based
   partitions
   - Dongjoon Hyun - HortonWorks - Looking at ORC support in v2,
   transactional processing on the write side, data lineage
   - Genglian Wang - DataBricks - ORC with v2 API
   - Hyukjin Kwon - HortonWorks - DSv2 API implementation for Hive
   warehouse, LLAP
   - Kevin Yu - IBM - Design for v2
   - Matt Cheah - Palantir - Interested in integrating a custom data store
   - Thomas D’Silva - Salesforce - Interested in a v2 Phoenix connector
   - Wenchen Fan - DataBricks - Proposed DSv2
   - Xiao Li - DataBricks - SparkSQL, reviews v2 patches
   - Yuanjian Li - Interested in continuous streaming, new catalog API

*Goals for this sync:*

   - Build consensus around the context that affects roadmap planning
   - Set priorities and some reasonable milestones
   - In the future, we’ll open things up to more general technical
   discussions, but we will be more effective if we are aligned first.

*Conclusions:*

   - Current blocker is Wenchen’s API update. Please review the refactor
   proposal
   
<https://docs.google.com/document/d/1uUmKCpWLdh9vHxP7AWJ9EgbwB_U6T3EJYNjhISGmiQg/edit#heading=h.l22hv3trducc>
   and PR #22547 <https://github.com/apache/spark/pull/22547>
   - Catalog support: this is a high priority blocker for SQL and real use
   of DSv2
  - Please review the TableCatalog SPIP proposal
  
<https://docs.google.com/document/d/1zLFiA1VuaWeVxeTDXNg8bL6GP3BVoOZBkewFtEnjEoo/edit#heading=h.m45webtwxf2d>
  and the implementation, PR #21306
  <https://github.com/apache/spark/pull/21306>
  - A proposal for incrementally introducing catalogs is in PR #21978
  <https://github.com/apache/spark/pull/21978>
  - Generic and specific catalog support should use the same API
  - Replacing the current global catalog will be done in parts with
  more specific APIs like TableCatalog and FunctionCatalog
   - Behavior compatibility:
  - V2 will have well-defined behavior, primarily implemented by Spark
  to ensure consistency across sources (e.g. CTAS)
  - Uses of V1 sources should not see behavior changes when sources are
  updated to use V2.
  - Reconciling these concerns is difficult. File-based sources may
  need to implement compatibility hacks, like checking
  spark.sql.sources.partitionOverwriteMode
  - Explicit overwrite is preferred to automatic partition overwrite.
  This mechanism could be used to translate some behaviors of INSERT
  OVERWRITE ... PARTITION for V2 sources.

*Context that affects roadmap planning:*

This is a *summary* of the long discussion, not quotes. It may not be in
the right order, but I think it captures the highlights.

   -

   The community adopted the SPIP to standardize logical plans
   
<https://docs.google.com/document/d/1gYm5Ji2Mge3QBdOliFV5gSPTKlX4q1DCBXIkiyMv62A/edit?ts=5a987801#heading=h.m45webtwxf2d>
   and this requires a catalog API for sources
   
<https://docs.google.com/document/d/1zLFiA1VuaWeVxeTDXNg8bL6GP3BVoOZBkewFtEnjEoo/edit#heading=h.m45webtwxf2d>.
   With multi-catalog support also coming soon, it makes sense to incorporate
   this in the design and planning.
   - Wenchen mentioned that there are two types of catalogs. First, generic
  catalogs that can track tables with configurable implementations
(like the
  current catalog that can track Parquet, JDBC, JSON, etc. tables). Second,
  there are specific catalogs that expose a certain type of table (like a
  JDBC catalog that exposes all of the tables in a relational DB or a
  Cassandra catalog).
  - Ryan: we should be able to use the same catalog API for both of
  these use cases.
  - Reynold: DataBricks is interested in a catalog API and it should
  replace the existing API. Replacing the existing API is difficult because
  there are many concerns, like tracking functions. The current API is
  complicated and may be difficult to replace.
  - Ryan: Past discussions have suggested replacing the current catalog
  API in pieces, like the proposed TableCatalog API for named tables, a
  FunctionCatalog API to track UDFs, and a Pat

Re: Make Scala 2.12 as default Scala version in Spark 3.0

2018-11-06 Thread Ryan Blue
+1 to Scala 2.12 as the default in Spark 3.0.

On Tue, Nov 6, 2018 at 11:50 AM DB Tsai  wrote:

> +1 on dropping Scala 2.11 in Spark 3.0 to simplify the build.
>
> As Scala 2.11 will not support Java 11 unless we make a significant
> investment, if we decide not to drop Scala 2.11 in Spark 3.0, what we can
> do is have only Scala 2.12 build support Java 11 while Scala 2.11 support
> Java 8. But I agree with Sean that this can make the decencies really
> complicated; hence I support to drop Scala 2.11 in Spark 3.0 directly.
>
> DB Tsai  |  Siri Open Source Technologies [not a contribution]  |  
> Apple, Inc
>
> On Nov 6, 2018, at 11:38 AM, Sean Owen  wrote:
>
> I think we should make Scala 2.12 the default in Spark 3.0. I would
> also prefer to drop Scala 2.11 support in 3.0. In theory, not dropping
> 2.11 support it means we'd support Scala 2.11 for years, the lifetime
> of Spark 3.x. In practice, we could drop 2.11 support in a 3.1.0 or
> 3.2.0 release, kind of like what happened with 2.10 in 2.x.
>
> Java (9-)11 support also complicates this. I think getting it to work
> will need some significant dependency updates, and I worry not all
> will be available for 2.11 or will present some knotty problems. We'll
> find out soon if that forces the issue.
>
> Also note that Scala 2.13 is pretty close to release, and we'll want
> to support it soon after release, perhaps sooner than the long delay
> before 2.12 was supported (because it was hard!). It will probably be
> out well before Spark 3.0. Cross-compiling for 3 Scala versions sounds
> like too much. 3.0 could support 2.11 and 2.12, and 3.1 support 2.12
> and 2.13, or something. But if 2.13 support is otherwise attainable at
> the release of Spark 3.0, I wonder if that too argues for dropping
> 2.11 support.
>
> Finally I'll say that Spark itself isn't dropping 2.11 support for a
> while, no matter what; it still exists in the 2.4.x branch of course.
> People who can't update off Scala 2.11 can stay on Spark 2.x, note.
>
> Sean
>
>
> On Tue, Nov 6, 2018 at 1:13 PM DB Tsai  wrote:
>
>
> We made Scala 2.11 as default Scala version in Spark 2.0. Now, the next
> Spark version will be 3.0, so it's a great time to discuss should we make
> Scala 2.12 as default Scala version in Spark 3.0.
>
> Scala 2.11 is EOL, and it came out 4.5 ago; as a result, it's unlikely to
> support JDK 11 in Scala 2.11 unless we're willing to sponsor the needed
> work per discussion in Scala community,
> https://github.com/scala/scala-dev/issues/559#issuecomment-436160166
>
> We have initial support of Scala 2.12 in Spark 2.4. If we decide to make
> Scala 2.12 as default for Spark 3.0 now, we will have ample time to work on
> bugs and issues that we may run into.
>
> What do you think?
>
> Thanks,
>
> DB Tsai  |  Siri Open Source Technologies [not a contribution]  |  
> Apple, Inc
>
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
> 
>
>
>

-- 
Ryan Blue
Software Engineer
Netflix


Re: Test and support only LTS JDK release?

2018-11-06 Thread Ryan Blue
+1 for supporting LTS releases.

On Tue, Nov 6, 2018 at 11:48 AM Robert Stupp  wrote:

> +1 on supporting LTS releases.
>
> VM distributors (RedHat, Azul - to name two) want to provide patches to
> LTS versions (i.e. into http://hg.openjdk.java.net/jdk-updates/jdk11u/).
> How that will play out in reality ... I don't know. Whether Oracle will
> contribute to that repo for 8 after it's EOL and 11 after the 6 month cycle
> ... we will see. Most Linux distributions promised(?) long-term support for
> Java 11 in their LTS releases (e.g. Ubuntu 18.04). I am not sure what that
> exactly means ... whether they will actively provide patches to OpenJDK or
> whether they just build from source.
>
> But considering that, I think it's definitely worth to at least keep an
> eye on Java 12 and 13 - even if those are just EA. Java 12 for example does
> already forbid some "dirty tricks" that are still possible in Java 11.
>
>
> On 11/6/18 8:32 PM, DB Tsai wrote:
>
> OpenJDK will follow Oracle's release cycle,
> https://openjdk.java.net/projects/jdk/, a strict six months model. I'm
> not familiar with other non-Oracle VMs and Redhat support.
>
> DB Tsai  |  Siri Open Source Technologies [not a contribution]  |  
> Apple, Inc
>
> On Nov 6, 2018, at 11:26 AM, Reynold Xin  wrote:
>
> What does OpenJDK do and other non-Oracle VMs? I know there was a lot of
> discussions from Redhat etc to support.
>
>
> On Tue, Nov 6, 2018 at 11:24 AM DB Tsai  wrote:
>
>> Given Oracle's new 6-month release model, I feel the only realistic
>> option is to only test and support JDK such as JDK 11 LTS and future LTS
>> release. I would like to have a discussion on this in Spark community.
>>
>> Thanks,
>>
>> DB Tsai  |  Siri Open Source Technologies [not a contribution]  |  
>> Apple, Inc
>>
>>
> --
> Robert Stupp
> @snazy
>
>

-- 
Ryan Blue
Software Engineer
Netflix


DataSourceV2 capability API

2018-11-08 Thread Ryan Blue
Hi everyone,

I’d like to propose an addition to DataSourceV2 tables, a capability API.
This API would allow Spark to query a table to determine whether it
supports a capability or not:

val table = catalog.load(identifier)
val supportsContinuous = table.isSupported("continuous-streaming")

There are a couple of use cases for this. First, we want to be able to fail
fast when a user tries to stream a table that doesn’t support it. The
design of our read implementation doesn’t necessarily support this. If we
want to share the same “scan” across streaming and batch, then we need to
“branch” in the API after that point, but that is at odds with failing
fast. We could use capabilities to fail fast and not worry about that
concern in the read design.

I also want to use capabilities to change the behavior of some validation
rules. The rule that validates appends, for example, doesn’t allow a write
that is missing an optional column. That’s because the current v1 sources
don’t support reading when columns are missing. But Iceberg does support
reading a missing column as nulls, so that users can add a column to a
table without breaking a scheduled job that populates the table. To fix
this problem, I would use a table capability, like
read-missing-columns-as-null.

Any comments on this approach?

rb
-- 
Ryan Blue
Software Engineer
Netflix


Re: DataSourceV2 capability API

2018-11-08 Thread Ryan Blue
Yes, we currently use traits that have methods. Something like “supports
reading missing columns” doesn’t need to deliver methods. The other example
is where we don’t have an object to test for a trait (
scan.isInstanceOf[SupportsBatch]) until we have a Scan with pushdown done.
That could be expensive so we can use a capability to fail faster.

On Thu, Nov 8, 2018 at 1:54 PM Reynold Xin  wrote:

> This is currently accomplished by having traits that data sources can
> extend, as well as runtime exceptions right? It's hard to argue one way vs
> another without knowing how things will evolve (e.g. how many different
> capabilities there will be).
>
>
> On Thu, Nov 8, 2018 at 12:50 PM Ryan Blue 
> wrote:
>
>> Hi everyone,
>>
>> I’d like to propose an addition to DataSourceV2 tables, a capability API.
>> This API would allow Spark to query a table to determine whether it
>> supports a capability or not:
>>
>> val table = catalog.load(identifier)
>> val supportsContinuous = table.isSupported("continuous-streaming")
>>
>> There are a couple of use cases for this. First, we want to be able to
>> fail fast when a user tries to stream a table that doesn’t support it. The
>> design of our read implementation doesn’t necessarily support this. If we
>> want to share the same “scan” across streaming and batch, then we need to
>> “branch” in the API after that point, but that is at odds with failing
>> fast. We could use capabilities to fail fast and not worry about that
>> concern in the read design.
>>
>> I also want to use capabilities to change the behavior of some validation
>> rules. The rule that validates appends, for example, doesn’t allow a write
>> that is missing an optional column. That’s because the current v1 sources
>> don’t support reading when columns are missing. But Iceberg does support
>> reading a missing column as nulls, so that users can add a column to a
>> table without breaking a scheduled job that populates the table. To fix
>> this problem, I would use a table capability, like
>> read-missing-columns-as-null.
>>
>> Any comments on this approach?
>>
>> rb
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>

-- 
Ryan Blue
Software Engineer
Netflix


Re: DataSourceV2 capability API

2018-11-09 Thread Ryan Blue
I'd have two places. First, a class that defines properties supported and
identified by Spark, like the SQLConf definitions. Second, in documentation
for the v2 table API.

On Fri, Nov 9, 2018 at 9:00 AM Felix Cheung 
wrote:

> One question is where will the list of capability strings be defined?
>
>
> --
> *From:* Ryan Blue 
> *Sent:* Thursday, November 8, 2018 2:09 PM
> *To:* Reynold Xin
> *Cc:* Spark Dev List
> *Subject:* Re: DataSourceV2 capability API
>
>
> Yes, we currently use traits that have methods. Something like “supports
> reading missing columns” doesn’t need to deliver methods. The other example
> is where we don’t have an object to test for a trait (
> scan.isInstanceOf[SupportsBatch]) until we have a Scan with pushdown
> done. That could be expensive so we can use a capability to fail faster.
>
> On Thu, Nov 8, 2018 at 1:54 PM Reynold Xin  wrote:
>
>> This is currently accomplished by having traits that data sources can
>> extend, as well as runtime exceptions right? It's hard to argue one way vs
>> another without knowing how things will evolve (e.g. how many different
>> capabilities there will be).
>>
>>
>> On Thu, Nov 8, 2018 at 12:50 PM Ryan Blue 
>> wrote:
>>
>>> Hi everyone,
>>>
>>> I’d like to propose an addition to DataSourceV2 tables, a capability
>>> API. This API would allow Spark to query a table to determine whether it
>>> supports a capability or not:
>>>
>>> val table = catalog.load(identifier)
>>> val supportsContinuous = table.isSupported("continuous-streaming")
>>>
>>> There are a couple of use cases for this. First, we want to be able to
>>> fail fast when a user tries to stream a table that doesn’t support it. The
>>> design of our read implementation doesn’t necessarily support this. If we
>>> want to share the same “scan” across streaming and batch, then we need to
>>> “branch” in the API after that point, but that is at odds with failing
>>> fast. We could use capabilities to fail fast and not worry about that
>>> concern in the read design.
>>>
>>> I also want to use capabilities to change the behavior of some
>>> validation rules. The rule that validates appends, for example, doesn’t
>>> allow a write that is missing an optional column. That’s because the
>>> current v1 sources don’t support reading when columns are missing. But
>>> Iceberg does support reading a missing column as nulls, so that users can
>>> add a column to a table without breaking a scheduled job that populates the
>>> table. To fix this problem, I would use a table capability, like
>>> read-missing-columns-as-null.
>>>
>>> Any comments on this approach?
>>>
>>> rb
>>> --
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
>>>
>>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>


-- 
Ryan Blue
Software Engineer
Netflix


Re: Behavior of SaveMode.Append when table is not present

2018-11-09 Thread Ryan Blue
Right now, it is up to the source implementation to decide what to do. I
think path-based tables (with no metastore component) treat an append as an
implicit create.

If you're thinking that relying on sources to interpret SaveMode is bad for
consistent behavior, I agree. That's why the community adopted a proposal
to standardize logical plans and the behavior
<https://docs.google.com/document/d/1gYm5Ji2Mge3QBdOliFV5gSPTKlX4q1DCBXIkiyMv62A/edit?ts=5a987801#heading=h.m45webtwxf2d>
expected of data sources for the v2 API.

On Thu, Nov 8, 2018 at 11:53 PM Shubham Chaurasia 
wrote:

> Hi,
>
> For SaveMode.Append, the doc
> https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#save-modes
> says
>
> *When saving a DataFrame to a data source, if data/table already exists,
> contents of the DataFrame are expected to be appended to existing data*
>
> However it does not specify behavior when the table does not exist.
> Does that throw exception or create the table or a NO-OP?
>
> Thanks,
> Shubham
>


-- 
Ryan Blue
Software Engineer
Netflix


Re: DataSourceV2 capability API

2018-11-09 Thread Ryan Blue
Do you have an example in mind where we might add a capability and break
old versions of data sources?

These are really for being able to tell what features a data source has. If
there is no way to report a feature (e.g., able to read missing as null)
then there is no way for Spark to take advantage of it in the first place.
For the uses I've proposed, forward compatibility isn't a concern. When we
add a capability, we add handling for it that old versions wouldn't be able
to use anyway. The advantage is that we don't have to treat all sources the
same.

On Fri, Nov 9, 2018 at 11:32 AM Reynold Xin  wrote:

> How do we deal with forward compatibility? Consider, Spark adds a new
> "property". In the past the data source supports that property, but since
> it was not explicitly defined, in the new version of Spark that data source
> would be considered not supporting that property, and thus throwing an
> exception.
>
>
> On Fri, Nov 9, 2018 at 9:11 AM Ryan Blue  wrote:
>
>> I'd have two places. First, a class that defines properties supported and
>> identified by Spark, like the SQLConf definitions. Second, in documentation
>> for the v2 table API.
>>
>> On Fri, Nov 9, 2018 at 9:00 AM Felix Cheung 
>> wrote:
>>
>>> One question is where will the list of capability strings be defined?
>>>
>>>
>>> --
>>> *From:* Ryan Blue 
>>> *Sent:* Thursday, November 8, 2018 2:09 PM
>>> *To:* Reynold Xin
>>> *Cc:* Spark Dev List
>>> *Subject:* Re: DataSourceV2 capability API
>>>
>>>
>>> Yes, we currently use traits that have methods. Something like “supports
>>> reading missing columns” doesn’t need to deliver methods. The other example
>>> is where we don’t have an object to test for a trait (
>>> scan.isInstanceOf[SupportsBatch]) until we have a Scan with pushdown
>>> done. That could be expensive so we can use a capability to fail faster.
>>>
>>> On Thu, Nov 8, 2018 at 1:54 PM Reynold Xin  wrote:
>>>
>>>> This is currently accomplished by having traits that data sources can
>>>> extend, as well as runtime exceptions right? It's hard to argue one way vs
>>>> another without knowing how things will evolve (e.g. how many different
>>>> capabilities there will be).
>>>>
>>>>
>>>> On Thu, Nov 8, 2018 at 12:50 PM Ryan Blue 
>>>> wrote:
>>>>
>>>>> Hi everyone,
>>>>>
>>>>> I’d like to propose an addition to DataSourceV2 tables, a capability
>>>>> API. This API would allow Spark to query a table to determine whether it
>>>>> supports a capability or not:
>>>>>
>>>>> val table = catalog.load(identifier)
>>>>> val supportsContinuous = table.isSupported("continuous-streaming")
>>>>>
>>>>> There are a couple of use cases for this. First, we want to be able to
>>>>> fail fast when a user tries to stream a table that doesn’t support it. The
>>>>> design of our read implementation doesn’t necessarily support this. If we
>>>>> want to share the same “scan” across streaming and batch, then we need to
>>>>> “branch” in the API after that point, but that is at odds with failing
>>>>> fast. We could use capabilities to fail fast and not worry about that
>>>>> concern in the read design.
>>>>>
>>>>> I also want to use capabilities to change the behavior of some
>>>>> validation rules. The rule that validates appends, for example, doesn’t
>>>>> allow a write that is missing an optional column. That’s because the
>>>>> current v1 sources don’t support reading when columns are missing. But
>>>>> Iceberg does support reading a missing column as nulls, so that users can
>>>>> add a column to a table without breaking a scheduled job that populates 
>>>>> the
>>>>> table. To fix this problem, I would use a table capability, like
>>>>> read-missing-columns-as-null.
>>>>>
>>>>> Any comments on this approach?
>>>>>
>>>>> rb
>>>>> --
>>>>> Ryan Blue
>>>>> Software Engineer
>>>>> Netflix
>>>>>
>>>>
>>>
>>> --
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
>>>
>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>

-- 
Ryan Blue
Software Engineer
Netflix


Re: DataSourceV2 capability API

2018-11-09 Thread Ryan Blue
For that case, I think we would have a property that defines whether
supports-decimal is assumed or checked with the capability.

Wouldn't we have this problem no matter what the capability API is? If we
used a trait to signal decimal support, then we would have to deal with
sources that were written before the trait was introduced. That doesn't
change the need for some way to signal support for specific capabilities
like the ones I've suggested.

On Fri, Nov 9, 2018 at 12:38 PM Reynold Xin  wrote:

> "If there is no way to report a feature (e.g., able to read missing as
> null) then there is no way for Spark to take advantage of it in the first
> place"
>
> Consider this (just a hypothetical scenario): We added "supports-decimal"
> in the future, because we see a lot of data sources don't support decimal
> and we want a more graceful error handling. That'd break all existing data
> sources.
>
> You can say we would never add any "existing" features to the feature list
> in the future, as a requirement for the feature list. But then I'm
> wondering how much does it really give you, beyond telling data sources to
> throw exceptions when they don't support a specific operation.
>
>
> On Fri, Nov 9, 2018 at 11:54 AM Ryan Blue  wrote:
>
>> Do you have an example in mind where we might add a capability and break
>> old versions of data sources?
>>
>> These are really for being able to tell what features a data source has.
>> If there is no way to report a feature (e.g., able to read missing as null)
>> then there is no way for Spark to take advantage of it in the first place.
>> For the uses I've proposed, forward compatibility isn't a concern. When we
>> add a capability, we add handling for it that old versions wouldn't be able
>> to use anyway. The advantage is that we don't have to treat all sources the
>> same.
>>
>> On Fri, Nov 9, 2018 at 11:32 AM Reynold Xin  wrote:
>>
>>> How do we deal with forward compatibility? Consider, Spark adds a new
>>> "property". In the past the data source supports that property, but since
>>> it was not explicitly defined, in the new version of Spark that data source
>>> would be considered not supporting that property, and thus throwing an
>>> exception.
>>>
>>>
>>> On Fri, Nov 9, 2018 at 9:11 AM Ryan Blue  wrote:
>>>
>>>> I'd have two places. First, a class that defines properties supported
>>>> and identified by Spark, like the SQLConf definitions. Second, in
>>>> documentation for the v2 table API.
>>>>
>>>> On Fri, Nov 9, 2018 at 9:00 AM Felix Cheung 
>>>> wrote:
>>>>
>>>>> One question is where will the list of capability strings be defined?
>>>>>
>>>>>
>>>>> --
>>>>> *From:* Ryan Blue 
>>>>> *Sent:* Thursday, November 8, 2018 2:09 PM
>>>>> *To:* Reynold Xin
>>>>> *Cc:* Spark Dev List
>>>>> *Subject:* Re: DataSourceV2 capability API
>>>>>
>>>>>
>>>>> Yes, we currently use traits that have methods. Something like
>>>>> “supports reading missing columns” doesn’t need to deliver methods. The
>>>>> other example is where we don’t have an object to test for a trait (
>>>>> scan.isInstanceOf[SupportsBatch]) until we have a Scan with pushdown
>>>>> done. That could be expensive so we can use a capability to fail faster.
>>>>>
>>>>> On Thu, Nov 8, 2018 at 1:54 PM Reynold Xin 
>>>>> wrote:
>>>>>
>>>>>> This is currently accomplished by having traits that data sources can
>>>>>> extend, as well as runtime exceptions right? It's hard to argue one way 
>>>>>> vs
>>>>>> another without knowing how things will evolve (e.g. how many different
>>>>>> capabilities there will be).
>>>>>>
>>>>>>
>>>>>> On Thu, Nov 8, 2018 at 12:50 PM Ryan Blue 
>>>>>> wrote:
>>>>>>
>>>>>>> Hi everyone,
>>>>>>>
>>>>>>> I’d like to propose an addition to DataSourceV2 tables, a capability
>>>>>>> API. This API would allow Spark to query a table to determine whether it
>>>>>>> supports a capability or not:
>>>>>>>
>>>>>>> val table = catalog.load(identifier)
>

Re: DataSourceV2 capability API

2018-11-09 Thread Ryan Blue
Another solution to the decimal case is using the capability API: use a
capability to signal that the table knows about `supports-decimal`. So
before the decimal support check, it would check
`table.isSupported("type-capabilities")`.

On Fri, Nov 9, 2018 at 12:45 PM Ryan Blue  wrote:

> For that case, I think we would have a property that defines whether
> supports-decimal is assumed or checked with the capability.
>
> Wouldn't we have this problem no matter what the capability API is? If we
> used a trait to signal decimal support, then we would have to deal with
> sources that were written before the trait was introduced. That doesn't
> change the need for some way to signal support for specific capabilities
> like the ones I've suggested.
>
> On Fri, Nov 9, 2018 at 12:38 PM Reynold Xin  wrote:
>
>> "If there is no way to report a feature (e.g., able to read missing as
>> null) then there is no way for Spark to take advantage of it in the first
>> place"
>>
>> Consider this (just a hypothetical scenario): We added "supports-decimal"
>> in the future, because we see a lot of data sources don't support decimal
>> and we want a more graceful error handling. That'd break all existing data
>> sources.
>>
>> You can say we would never add any "existing" features to the feature
>> list in the future, as a requirement for the feature list. But then I'm
>> wondering how much does it really give you, beyond telling data sources to
>> throw exceptions when they don't support a specific operation.
>>
>>
>> On Fri, Nov 9, 2018 at 11:54 AM Ryan Blue  wrote:
>>
>>> Do you have an example in mind where we might add a capability and break
>>> old versions of data sources?
>>>
>>> These are really for being able to tell what features a data source has.
>>> If there is no way to report a feature (e.g., able to read missing as null)
>>> then there is no way for Spark to take advantage of it in the first place.
>>> For the uses I've proposed, forward compatibility isn't a concern. When we
>>> add a capability, we add handling for it that old versions wouldn't be able
>>> to use anyway. The advantage is that we don't have to treat all sources the
>>> same.
>>>
>>> On Fri, Nov 9, 2018 at 11:32 AM Reynold Xin  wrote:
>>>
>>>> How do we deal with forward compatibility? Consider, Spark adds a new
>>>> "property". In the past the data source supports that property, but since
>>>> it was not explicitly defined, in the new version of Spark that data source
>>>> would be considered not supporting that property, and thus throwing an
>>>> exception.
>>>>
>>>>
>>>> On Fri, Nov 9, 2018 at 9:11 AM Ryan Blue  wrote:
>>>>
>>>>> I'd have two places. First, a class that defines properties supported
>>>>> and identified by Spark, like the SQLConf definitions. Second, in
>>>>> documentation for the v2 table API.
>>>>>
>>>>> On Fri, Nov 9, 2018 at 9:00 AM Felix Cheung 
>>>>> wrote:
>>>>>
>>>>>> One question is where will the list of capability strings be defined?
>>>>>>
>>>>>>
>>>>>> --
>>>>>> *From:* Ryan Blue 
>>>>>> *Sent:* Thursday, November 8, 2018 2:09 PM
>>>>>> *To:* Reynold Xin
>>>>>> *Cc:* Spark Dev List
>>>>>> *Subject:* Re: DataSourceV2 capability API
>>>>>>
>>>>>>
>>>>>> Yes, we currently use traits that have methods. Something like
>>>>>> “supports reading missing columns” doesn’t need to deliver methods. The
>>>>>> other example is where we don’t have an object to test for a trait (
>>>>>> scan.isInstanceOf[SupportsBatch]) until we have a Scan with pushdown
>>>>>> done. That could be expensive so we can use a capability to fail faster.
>>>>>>
>>>>>> On Thu, Nov 8, 2018 at 1:54 PM Reynold Xin 
>>>>>> wrote:
>>>>>>
>>>>>>> This is currently accomplished by having traits that data sources
>>>>>>> can extend, as well as runtime exceptions right? It's hard to argue one 
>>>>>>> way
>>>>>>> vs another without knowing how things will evolve (e.g. how many 
>>>>>>> different
>>&g

DataSourceV2 sync tomorrow

2018-11-13 Thread Ryan Blue
Hi everyone,
I just wanted to send out a reminder that there’s a DSv2 sync tomorrow at
17:00 PST, which is 01:00 UTC.

Here are some of the topics under discussion in the last couple of weeks:

   - Read API for v2 - see Wenchen’s doc
   
<https://docs.google.com/document/d/1uUmKCpWLdh9vHxP7AWJ9EgbwB_U6T3EJYNjhISGmiQg/edit?ts=5be4868a#heading=h.2h7sf1665hzn>
   - Capabilities API - see the dev list thread
   
<https://mail-archives.apache.org/mod_mbox/spark-dev/201811.mbox/%3CCAO4re1%3Doizqo1oFfVViK3bKWCp7MROeATXcWAEUY5%2B8Vpf6GGw%40mail.gmail.com%3E>
   - Using CatalogTableIdentifier to reliably separate v2 code paths - see PR
   #21978 <https://github.com/apache/spark/pull/21978>
   - A replacement for InternalRow

I know that a lot of people are also interested in combining the source API
for micro-batch and continuous streaming. Wenchen and I have been
discussing a way to do that and Wenchen has added it to the Read API doc as
Alternative #2. I think this would be a good thing to plan on discussing.

rb

Here’s some additional background on combining micro-batch and continuous
APIs:

The basic idea is to update how tasks end so that the same tasks can be
used in micro-batch or streaming. For tasks that are naturally limited like
data files, when the data is exhausted, Spark stops reading. For tasks that
are not limited, like a Kafka partition, Spark decides when to stop in
micro-batch mode by hitting a pre-determined LocalOffset or Spark can just
keep running in continuous mode.

Note that a task deciding to stop can happen in both modes, either when a
task is exhausted in micro-batch or when a stream needs to be reconfigured
in continuous.

Here’s the task reader API. The offset returned is optional so that a task
can avoid stopping if there isn’t a resumeable offset, like if it is in the
middle of an input file:

interface StreamPartitionReader extends InputPartitionReader {
  Optional currentOffset();
  boolean next() // from InputPartitionReader
  T get()// from InputPartitionReader
}

The streaming code would look something like this:

Stream stream = scan.toStream()
StreamReaderFactory factory = stream.createReaderFactory()

while (true) {
  Offset start = stream.currentOffset()
  Offset end = if (isContinuousMode) {
None
  } else {
// rate limiting would happen here
Some(stream.latestOffset())
  }

  InputPartition[] parts = stream.planInputPartitions(start)

  // returns when needsReconfiguration is true or all tasks finish
  runTasks(parts, factory, end)

  // the stream's current offset has been updated at the last epoch
}

-- 
Ryan Blue
Software Engineer
Netflix


Re: DataSourceV2 sync tomorrow

2018-11-14 Thread Ryan Blue
Jamison, I've added you to the invite. If anyone else wants to be invited,
please send me a request. You can send it directly to me to avoid too many
messages on this thread.

On Wed, Nov 14, 2018 at 8:57 AM Jamison Bennett
 wrote:

> Hi Spark Team,
>
> I am interested in joining this meeting because I am interested in the
> data source v2 APIs. I couldn't find information about this meeting, so
> could someone please share the link?
>
> Thanks,
>
> Jamison Bennett
>
> Cloudera Software Engineer
>
> jamison.benn...@cloudera.com
>
> 515 Congress Ave, Suite 1212   |   Austin, TX   |   78701
>
>
> On Wed, Nov 14, 2018 at 1:51 AM Arun Mahadevan  wrote:
>
>> IMO, the currentOffset should not be optional.
>> For continuous mode I assume this offset gets periodically check pointed
>> (so mandatory) ?
>> For the micro batch mode the currentOffset would be the start offset for
>> a micro-batch.
>>
>> And if the micro-batch could be executed without knowing the 'latest'
>> offset (say until 'next' returns false), we only need the current offset
>> (to figure out the offset boundaries of a micro-batch) and may be then the
>> 'latest' offset is not needed at all.
>>
>> - Arun
>>
>>
>> On Tue, 13 Nov 2018 at 16:01, Ryan Blue 
>> wrote:
>>
>>> Hi everyone,
>>> I just wanted to send out a reminder that there’s a DSv2 sync tomorrow
>>> at 17:00 PST, which is 01:00 UTC.
>>>
>>> Here are some of the topics under discussion in the last couple of weeks:
>>>
>>>- Read API for v2 - see Wenchen’s doc
>>>
>>> <https://docs.google.com/document/d/1uUmKCpWLdh9vHxP7AWJ9EgbwB_U6T3EJYNjhISGmiQg/edit?ts=5be4868a#heading=h.2h7sf1665hzn>
>>>- Capabilities API - see the dev list thread
>>>
>>> <https://mail-archives.apache.org/mod_mbox/spark-dev/201811.mbox/%3CCAO4re1%3Doizqo1oFfVViK3bKWCp7MROeATXcWAEUY5%2B8Vpf6GGw%40mail.gmail.com%3E>
>>>- Using CatalogTableIdentifier to reliably separate v2 code paths -
>>>see PR #21978 <https://github.com/apache/spark/pull/21978>
>>>- A replacement for InternalRow
>>>
>>> I know that a lot of people are also interested in combining the source
>>> API for micro-batch and continuous streaming. Wenchen and I have been
>>> discussing a way to do that and Wenchen has added it to the Read API doc as
>>> Alternative #2. I think this would be a good thing to plan on discussing.
>>>
>>> rb
>>>
>>> Here’s some additional background on combining micro-batch and
>>> continuous APIs:
>>>
>>> The basic idea is to update how tasks end so that the same tasks can be
>>> used in micro-batch or streaming. For tasks that are naturally limited like
>>> data files, when the data is exhausted, Spark stops reading. For tasks that
>>> are not limited, like a Kafka partition, Spark decides when to stop in
>>> micro-batch mode by hitting a pre-determined LocalOffset or Spark can just
>>> keep running in continuous mode.
>>>
>>> Note that a task deciding to stop can happen in both modes, either when
>>> a task is exhausted in micro-batch or when a stream needs to be
>>> reconfigured in continuous.
>>>
>>> Here’s the task reader API. The offset returned is optional so that a
>>> task can avoid stopping if there isn’t a resumeable offset, like if it is
>>> in the middle of an input file:
>>>
>>> interface StreamPartitionReader extends InputPartitionReader {
>>>   Optional currentOffset();
>>>   boolean next() // from InputPartitionReader
>>>   T get()// from InputPartitionReader
>>> }
>>>
>>> The streaming code would look something like this:
>>>
>>> Stream stream = scan.toStream()
>>> StreamReaderFactory factory = stream.createReaderFactory()
>>>
>>> while (true) {
>>>   Offset start = stream.currentOffset()
>>>   Offset end = if (isContinuousMode) {
>>> None
>>>   } else {
>>> // rate limiting would happen here
>>> Some(stream.latestOffset())
>>>   }
>>>
>>>   InputPartition[] parts = stream.planInputPartitions(start)
>>>
>>>   // returns when needsReconfiguration is true or all tasks finish
>>>   runTasks(parts, factory, end)
>>>
>>>   // the stream's current offset has been updated at the last epoch
>>> }
>>>
>>> --
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
>>>
>>

-- 
Ryan Blue
Software Engineer
Netflix


Re: DataSourceV2 sync tomorrow

2018-11-14 Thread Ryan Blue
The live stream link for this is
https://stream.meet.google.com/stream/6be59d80-04c7-44dc-9042-4f3b597fc8ba

Some people said that it didn't work last time. I'm not sure why that would
happen, but I don't use these much so I'm no expert. If you can't join the
live stream, then feel free to join the meet up.

I'll also plan on joining earlier than I did last time, in case we the
meet/hangout needs to be up for people to view the live stream.

rb

On Tue, Nov 13, 2018 at 4:00 PM Ryan Blue  wrote:

> Hi everyone,
> I just wanted to send out a reminder that there’s a DSv2 sync tomorrow at
> 17:00 PST, which is 01:00 UTC.
>
> Here are some of the topics under discussion in the last couple of weeks:
>
>- Read API for v2 - see Wenchen’s doc
>
> <https://docs.google.com/document/d/1uUmKCpWLdh9vHxP7AWJ9EgbwB_U6T3EJYNjhISGmiQg/edit?ts=5be4868a#heading=h.2h7sf1665hzn>
>- Capabilities API - see the dev list thread
>
> <https://mail-archives.apache.org/mod_mbox/spark-dev/201811.mbox/%3CCAO4re1%3Doizqo1oFfVViK3bKWCp7MROeATXcWAEUY5%2B8Vpf6GGw%40mail.gmail.com%3E>
>- Using CatalogTableIdentifier to reliably separate v2 code paths -
>see PR #21978 <https://github.com/apache/spark/pull/21978>
>- A replacement for InternalRow
>
> I know that a lot of people are also interested in combining the source
> API for micro-batch and continuous streaming. Wenchen and I have been
> discussing a way to do that and Wenchen has added it to the Read API doc as
> Alternative #2. I think this would be a good thing to plan on discussing.
>
> rb
>
> Here’s some additional background on combining micro-batch and continuous
> APIs:
>
> The basic idea is to update how tasks end so that the same tasks can be
> used in micro-batch or streaming. For tasks that are naturally limited like
> data files, when the data is exhausted, Spark stops reading. For tasks that
> are not limited, like a Kafka partition, Spark decides when to stop in
> micro-batch mode by hitting a pre-determined LocalOffset or Spark can just
> keep running in continuous mode.
>
> Note that a task deciding to stop can happen in both modes, either when a
> task is exhausted in micro-batch or when a stream needs to be reconfigured
> in continuous.
>
> Here’s the task reader API. The offset returned is optional so that a task
> can avoid stopping if there isn’t a resumeable offset, like if it is in the
> middle of an input file:
>
> interface StreamPartitionReader extends InputPartitionReader {
>   Optional currentOffset();
>   boolean next() // from InputPartitionReader
>   T get()// from InputPartitionReader
> }
>
> The streaming code would look something like this:
>
> Stream stream = scan.toStream()
> StreamReaderFactory factory = stream.createReaderFactory()
>
> while (true) {
>   Offset start = stream.currentOffset()
>   Offset end = if (isContinuousMode) {
> None
>   } else {
> // rate limiting would happen here
> Some(stream.latestOffset())
>   }
>
>   InputPartition[] parts = stream.planInputPartitions(start)
>
>   // returns when needsReconfiguration is true or all tasks finish
>   runTasks(parts, factory, end)
>
>   // the stream's current offset has been updated at the last epoch
> }
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>


-- 
Ryan Blue
Software Engineer
Netflix


Re: DataSourceV2 sync tomorrow

2018-11-15 Thread Ryan Blue
Below are my notes from the sync yesterday. Thanks to everyone that
participated!
For the format of this sync, I think it would help to make a small change.
Since we have so many people and it take a long time to introduce everyone,
let’s try to get to the content faster by not doing the round of
introductions and topic gathering. Instead, please send your topics to the
sync thread on this list ahead of time. Just make sure I have them and I’ll
add them to the invite and agenda, along with any links for background.

I also want to add a quick note about the live stream. After running a
couple of tests, it looks like live streams only work within an
organization. In the future, I won’t add a live stream since no one but
people from Netflix can join.

Last, here are the notes:

*Attendees*

Ryan Blue - Netflix
John Zhuge - Netflix
Yuanjian Li - Baidu - Interested in Catalog API
Felix Cheung - Uber
Hyukjin Kwon - Hortonworks
Vinoo Ganesh - Palantir
Soumya Sanyal - ?
Bruce Robbins - Cloudera
Alessandro Bellina - Oath, here to learn
Jamison Bennett - Cloudera - Interested in Catalog API
Anton Okolnychyi - Apple
Gengliang Wang - DataBricks - ORC source
Wenchen Fan - DataBricks
Dilip Biswal - IBM - Push-down of new operators like limit
Kevin Yu - IBM
Matt Cheah - Palantir - Interested in CBO
Austin Nobis - Cloudera
Jungtaek Lim - Hortonworks - Interested in exactly-once semantics
Vikram Agrawal - Custom metrics
Sribasti Chakravarti

*Suggested Topics*

   - DSv2 API changes
  - New proposal
  - Alternative #1: Combining Scan with Batch or Stream
  - Alternative #2: Combining micro-batch and continuous APIs
   - Capabilities API
   - CatalogTableIdentifier
   - Push-down API
   - CBO and stats API
   - Exactly-once semantics

*Discussion*

The entire discussion was about the DSv2 API changes in Wenchen’s design
doc.

   - Wenchen went through the current status and the new proposal.
  - Not many questions, the API and behavior are clear and
  understandable.
  - Some discussion, started by Dilip about how join push-down will
  work. Ryan noted that we just need to make sure that the design doesn’t
  preclude reasonable options for later. Wenchen suggested one such option,
  to add methods to push a join into the ScanBuilder. It isn’t clear how
  exactly this will work, but consensus seemed to be that this
will not break
  later designs. Dilip has a join push-down design doc (please reply with a
  link!).
  - Consensus was to go with the new proposal.
   - Wenchen went through alternative #1, which merges Scan into the next
   layer to produce BatchScan, MicroBatchStreamScan, ContinuousStreamScan
  - Ryan’s commentary: concerned that Scan is a distinct concept and
  may be useful in implementations. Would merging it into other
objects cause
  duplication or force an inheritance hierarchy? Clearly, the
names show that
  it is mixing two concepts: BatchScan = Batch + Scan
  - Matt commented that it seems unlikely that Scan will be
  independently useful
  - Wenchen noted that we can merge later if it isn’t useful
  - Ryan noted that separate interfaces give the most flexibility for
  implementations. An implementation can create BatchScan that
extends both.
  - Conclusion: keep the interfaces separate for now and reassess later.
   - Ryan went through alternative #2, which merges micro-batch and
   continuous read interfaces
  - To merge execution code, Spark would be responsible for stopping
  tasks. Tasks would attempt to read forever and Spark determines
whether to
  run a batch or run forever.
  - Some tasks are naturally limited, like data files added to a table.
  Spark would need to handle tasks stopping themselves early.
  - Some tasks are naturally boundless, like Kafka topic partitions.
  Tasks would need to provide offsets for Spark to decide when to stop
  reading.
  - The resulting task reader behavior is awkward and no longer fits
  either naturally limited (must provide “offset”) nor naturally boundless
  tasks (why stop early? why use micro-batch?)
  - Conclusion was to have simpler APIs by keeping modes separate.


On Tue, Nov 13, 2018 at 4:00 PM Ryan Blue  wrote:

> Hi everyone,
> I just wanted to send out a reminder that there’s a DSv2 sync tomorrow at
> 17:00 PST, which is 01:00 UTC.
>
> Here are some of the topics under discussion in the last couple of weeks:
>
>- Read API for v2 - see Wenchen’s doc
>
> <https://docs.google.com/document/d/1uUmKCpWLdh9vHxP7AWJ9EgbwB_U6T3EJYNjhISGmiQg/edit?ts=5be4868a#heading=h.2h7sf1665hzn>
>- Capabilities API - see the dev list thread
>
> <https://mail-archives.apache.org/mod_mbox/spark-dev/201811.mbox/%3CCAO4re1%3Doizqo1oFfVViK3bKWCp7MROeATXcWAEUY5%2B8Vpf6GGw%40mail.gmail.com%3E>
>- Using CatalogTableIdentifier to reliably separate v2 code paths -
> 

Re: Make Scala 2.12 as default Scala version in Spark 3.0

2018-11-20 Thread Ryan Blue
+1 to removing 2.11 support for 3.0 and a PR.

It sounds like having multiple Scala builds is just not feasible and I
don't think this will be too disruptive for users since it is already a
breaking change.

On Tue, Nov 20, 2018 at 7:05 AM Sean Owen  wrote:

> One more data point -- from looking at the SBT build yesterday, it
> seems like most plugin updates require SBT 1.x. And both they and SBT
> 1.x seem to need Scala 2.12. And the new zinc also does.
> Now, the current SBT and zinc and plugins all appear to work OK with
> 2.12 now, but updating will pretty much have to wait until 2.11
> support goes. (I don't think it's feasible to have two SBT builds.)
>
> I actually haven't heard an argument for keeping 2.11, compared to the
> overhead of maintaining it. Any substantive objections? Would it be
> too forward to put out a WIP PR that removes it?
>
> On Sat, Nov 17, 2018 at 7:28 PM Sean Owen  wrote:
> >
> > I support dropping 2.11 support. My general logic is:
> >
> > - 2.11 is EOL, and is all the more EOL in the middle of next year when
> > Spark 3 arrives
> > - I haven't heard of a critical dependency that has no 2.12 counterpart
> > - 2.11 users can stay on 2.4.x, which will be notionally supported
> > through, say, end of 2019
> > - Maintaining 2.11 vs 2.12 support is modestly difficult, in my
> > experience resolving these differences across these two versions; it's
> > a hassle as you need two git clones with different scala versions in
> > the project tags
> > - The project is already short on resources to support things as it is
> > - Dropping things is generally necessary to add new things, to keep
> > complexity reasonable -- like Scala 2.13 support
> >
> > Maintaining a separate PR builder for 2.11 isn't so bad
> >
> > On Fri, Nov 16, 2018 at 4:09 PM Marcelo Vanzin
> >  wrote:
> > >
> > > Now that the switch to 2.12 by default has been made, it might be good
> > > to have a serious discussion about dropping 2.11 altogether. Many of
> > > the main arguments have already been talked about. But I don't
> > > remember anyone mentioning how easy it would be to break the 2.11
> > > build now.
> > >
> > > For example, the following works fine in 2.12 but breaks in 2.11:
> > >
> > > java.util.Arrays.asList("hi").stream().forEach(println)
> > >
> > > We had a similar issue when we supported java 1.6 but the builds were
> > > all on 1.7 by default. Every once in a while something would silently
> > > break, because PR builds only check the default. And the jenkins
> > > builds, which are less monitored, would stay broken for a while.
> > >
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>

-- 
Ryan Blue
Software Engineer
Netflix


DataSourceV2 community sync #3

2018-11-26 Thread Ryan Blue
Hi everyone,

I just sent out an invite for the next DSv2 community sync for Wednesday,
28 Nov at 5PM PST.

We have a few topics left over from last time to cover. A few people wanted
to cover catalog APIs, so I put two items on the agenda:

   - The TableCatalog proposal (and other catalog APIs)
   - Using CatalogTableIdentifier to separate v1 and v2 code paths and
   avoid unintended behavior changes

As I noted in the summary last time, please send topics ahead of time so we
can get started more quickly.

If you would like to be added to the google hangout invite, please let me
know and I’ll add you. Thanks!

rb
-- 
Ryan Blue
Software Engineer
Netflix


Re: DataSourceV2 community sync #3

2018-11-29 Thread Ryan Blue
Hi everyone,

Here are my notes from last night’s sync. Some attendees that joined during
discussion may be missing, since I made the list while we were waiting for
people to join.

If you have topic suggestions for the next sync, please start sending them
to me. Thank you!

*Attendees:*

Ryan Blue
John Zhuge
Jamison Bennett
Yuanjian Li
Xiao Li
stczwd
Matt Cheah
Wenchen Fan
Genglian Wang
Kevin Yu
Maryann Xue
Cody Koeninger
Bruce Robbins
Rohit Karlupia

*Agenda:*

   - Follow-up issues or discussion on Wenchen’s PR #23086
   - TableCatalog proposal
   - CatalogTableIdentifier

*Notes:*

   - Discussion about PR #23086
  - Where should the catalog API live since it needs to be accessible
  to catalyst rules, but the catalyst module is private?
  - Wenchen suggested creating a sql-api module for v2 API interfaces,
  making catalyst depend on it
  - Consensus was to use Wenchen’s suggestion
   - In discussion about #23086, Xiao asked how adding catalog to a table
   identifier will work
  - Background from Ryan: existing code paths use TableIdentifier and
  don’t expect a catalog portion. If an identifier with a catalog
were passed
  to existing code, that code may use the default catalog not
knowing that a
  different one was requested, which would be incorrect behavior.
  - Ryan: The proposal for CatalogTableIdentifier addresses this
  problem. TableIdentifier is used for identifiers that have no
catalog set.
  By enforcing that requirement, passing a TableIdentifier to old code
  ensures that no catalogs leak into that code. This is also used when the
  catalog is set from context. For example, the TableCatalog API
accepts only
  TableIdentifier because the catalog is already determined.
   - Xiao asked whether FunctionIdentifier needs to be updated in the same
   way as CatalogTableIdentifier.
  - Ryan: Yes, when a FunctionCatalog API is added
   - The remaining time was spent discussing whether the plan to
   incrementally replace the current catalog API will work. [Not great notes
   here, feel free to add your take in a reply]
  - Xiao suggested that there are restrictions for how tables and
  functions interact. Because of this, he doesn’t think that separate
  TableCatalog and FunctionCatalog APIs are feasible.
  - Wenchen and Ryan think that functions should be orthogonal to data
  sources
  - Matt and Ryan think that catalog design can be done incrementally
  as new interfaces (i.e. FunctionCatalog) are added and that the proposed
  TableCatalog does not preclude designing for Xiao’s concerns later
  - [I forget who] pointed out that there are restrictions in some
  databases for views from different sources
  - There was some discussion about when functions or views cannot be
  orthogonal. For example, where the code runs is important.
Functions pushed
  to sources cannot necessarily be run on other sources and Spark functions
  cannot necessarily be pushed down to sources.
  - Xiao would like a full catalog replacement design, including views,
  databases, and functions and how they interact, before moving
forward with
  the proposed TableCatalog API
  - Ryan [and Matt, I think] think that TableCatalog is compatible with
  future decisions and the best path forward is to build incrementally. An
  exhaustive design process blocks progress on v2.


On Mon, Nov 26, 2018 at 2:54 PM Ryan Blue  wrote:

> Hi everyone,
>
> I just sent out an invite for the next DSv2 community sync for Wednesday,
> 28 Nov at 5PM PST.
>
> We have a few topics left over from last time to cover. A few people
> wanted to cover catalog APIs, so I put two items on the agenda:
>
>- The TableCatalog proposal (and other catalog APIs)
>- Using CatalogTableIdentifier to separate v1 and v2 code paths and
>avoid unintended behavior changes
>
> As I noted in the summary last time, please send topics ahead of time so
> we can get started more quickly.
>
> If you would like to be added to the google hangout invite, please let me
> know and I’ll add you. Thanks!
>
> rb
> --
> Ryan Blue
> Software Engineer
> Netflix
>


-- 
Ryan Blue
Software Engineer
Netflix


Re: DataSourceV2 community sync #3

2018-11-29 Thread Ryan Blue
Xiao,

Please have a look at the pull requests and documents I've posted over the
last few months.

If you still have questions about how you might plug in Glue, let me know
and I can clarify.

rb

On Thu, Nov 29, 2018 at 2:56 PM Xiao Li  wrote:

> Ryan,
>
> Thanks for leading the discussion and sending out the memo!
>
>
>> Xiao suggested that there are restrictions for how tables and functions
>> interact. Because of this, he doesn’t think that separate TableCatalog and
>> FunctionCatalog APIs are feasible.
>
>
> Anything is possible. It depends on how we design the two interfaces. Now,
> most parts are unknown to me without seeing the design.
>
> I think we need to see the user stories, and high-level design before
> working on a small portion of Catalog federation. We do not need an
> exhaustive design in the current stage, but we need to know how the new
> proposal works. For example, how to plug in a new Hive metastore? How to
> plug in a Glue? How do users implement a new external catalog without
> adding any new data sources? Without knowing more details, it is hard to
> say whether this TableCatalog can satisfy all the requirements.
>
> Cheers,
>
> Xiao
>
>
> Ryan Blue  于2018年11月29日周四 下午2:32写道:
>
>> Hi everyone,
>>
>> Here are my notes from last night’s sync. Some attendees that joined
>> during discussion may be missing, since I made the list while we were
>> waiting for people to join.
>>
>> If you have topic suggestions for the next sync, please start sending
>> them to me. Thank you!
>>
>> *Attendees:*
>>
>> Ryan Blue
>> John Zhuge
>> Jamison Bennett
>> Yuanjian Li
>> Xiao Li
>> stczwd
>> Matt Cheah
>> Wenchen Fan
>> Genglian Wang
>> Kevin Yu
>> Maryann Xue
>> Cody Koeninger
>> Bruce Robbins
>> Rohit Karlupia
>>
>> *Agenda:*
>>
>>- Follow-up issues or discussion on Wenchen’s PR #23086
>>- TableCatalog proposal
>>- CatalogTableIdentifier
>>
>> *Notes:*
>>
>>- Discussion about PR #23086
>>   - Where should the catalog API live since it needs to be
>>   accessible to catalyst rules, but the catalyst module is private?
>>   - Wenchen suggested creating a sql-api module for v2 API
>>   interfaces, making catalyst depend on it
>>   - Consensus was to use Wenchen’s suggestion
>>- In discussion about #23086, Xiao asked how adding catalog to a
>>table identifier will work
>>   - Background from Ryan: existing code paths use TableIdentifier
>>   and don’t expect a catalog portion. If an identifier with a catalog 
>> were
>>   passed to existing code, that code may use the default catalog not 
>> knowing
>>   that a different one was requested, which would be incorrect behavior.
>>   - Ryan: The proposal for CatalogTableIdentifier addresses this
>>   problem. TableIdentifier is used for identifiers that have no catalog 
>> set.
>>   By enforcing that requirement, passing a TableIdentifier to old code
>>   ensures that no catalogs leak into that code. This is also used when 
>> the
>>   catalog is set from context. For example, the TableCatalog API accepts 
>> only
>>   TableIdentifier because the catalog is already determined.
>>- Xiao asked whether FunctionIdentifier needs to be updated in the
>>same way as CatalogTableIdentifier.
>>   - Ryan: Yes, when a FunctionCatalog API is added
>>- The remaining time was spent discussing whether the plan to
>>incrementally replace the current catalog API will work. [Not great notes
>>here, feel free to add your take in a reply]
>>   - Xiao suggested that there are restrictions for how tables and
>>   functions interact. Because of this, he doesn’t think that separate
>>   TableCatalog and FunctionCatalog APIs are feasible.
>>   - Wenchen and Ryan think that functions should be orthogonal to
>>   data sources
>>   - Matt and Ryan think that catalog design can be done
>>   incrementally as new interfaces (i.e. FunctionCatalog) are added and 
>> that
>>   the proposed TableCatalog does not preclude designing for Xiao’s 
>> concerns
>>   later
>>   - [I forget who] pointed out that there are restrictions in some
>>   databases for views from different sources
>>   - There was some discussion about when functions or views cannot
>>   be orthogonal. For example, where the code runs is important. Functions
>>   pushed to sources cannot necessari

Catalog API discussion (was: DataSourceV2 community sync #3)

2018-11-29 Thread Ryan Blue
Hi Wenchen,
I’ll add my responses inline. The answers are based on the proposed
TableCatalog API:

   - SPIP: Spark table metadata
   
<https://docs.google.com/document/d/1zLFiA1VuaWeVxeTDXNg8bL6GP3BVoOZBkewFtEnjEoo/edit#heading=h.m45webtwxf2d>
   - PR #21306 <https://github.com/apache/spark/pull/21306>

On Wed, Nov 28, 2018 at 6:41 PM Wenchen Fan cloud0...@gmail.com
<http://mailto:cloud0...@gmail.com> wrote:

Thanks for hosting the discussion! I think the table catalog is super
> useful, but since this is the first time we allow users to extend catalog,
> it's better to write down some details from end-user APIs to internal
> management.
> 1. How would end-users register/unregister catalog with SQL API and
> Scala/Java API?
>
In the PR, users or administrators create catalogs by setting properties in
the SQL conf. To create and configure a test catalog implemented by
SomeCatalogClass, it looks like this:

spark.sql.catalog.test = com.example.SomeCatalogClass
spark.sql.catalog.test.config-var = value

For example, we have our own catalog, metacat, and we pass a service URI to
it and a property to tell it to use “prod” or “test” tables.

2. How would end-users manage catalogs? like LIST CATALOGS, USE CATALOG xyz?
>
Users and administrators can configure catalogs using properties like I
mentioned above. We could also implement the SQL statements like you
describe here.

Presto uses SHOW CATALOGS [LIKE prefix].

3. How to separate the abilities of catalog? Can we create a bunch of mixin
> triats for catalog API like SupportsTable, SupportsFunction, SupportsView,
> etc.?
>
What I’ve proposed is a base class, CatalogProvider
<https://github.com/apache/spark/pull/21306/files#diff-81c54123a7549b07a9d627353d9cbf95>,
that all catalogs inherit from. A CatalogProvider can be loaded as I
described above and is passed configuration through an initialize method.

Catalog implementations would also implement interfaces that carry a set of
methods for some task. What I’ve proposed is TableCatalog
<https://github.com/apache/spark/pull/21306/files#diff-a06043294c1e2c49a34aa0356f9e5450>
that exposes methods from the Table metadata APIs SPIP.

When a TableCatalog is used in a DDL statement like DROP TABLE, for
example, an analysis rule matches the raw SQL plan, resolves/loads the
catalog, and checks that it is a TableCatalog. Then passes on a logical
plan with the right catalog type:

case class DropTable(catalog: TableCatalog, table: TableIdentifier,
ifExists: Boolean) extends Command

4. How should Spark resolve identifies with catalog name? How to resolve
> ambiguity? What if the catalog doesn't support database? Can users write
> `catalogName.tblName` directly?
>
In #21978 <https://github.com/apache/spark/pull/21978>, I proposed
CatalogTableIdentifier that passes catalog, database, and table name. The
easiest and safest answer is to fill in a “current” catalog when missing
(just like the “current” database) and always interpret 2 identifiers as
database and table, never catalog and table.

How Spark decides to do this is really orthogonal to the catalog API.

5. Where does Spark store the catalog list? In an in-memory map?
>
SparkSession tracks catalog instances. Each catalog is loaded once (unless
we add some statement to reload) and cached in the session. The session is
how the current global catalog is accessed as well.

Another reason why catalogs are session-specific is that they can hold
important session-specific state. For example, Iceberg’s catalog caches
tables when loaded so that the same snapshot of a table is used for all
reads in a query. Not all table formats support this, so it is optional.

6. How to support atomic CTAS?
>
The plan we’ve discussed is to create tables with “staged” changes (see the
SPIP doc). When the write operation commits, all of the changes are
committed at once. I’m flexible on this and I think we have room for other
options as well. The current proposal only covers non-atomic CTAS.

7. The data/schema of table may change over time, when should Spark
> determine the table content? During analysis or planning?
>
Spark loads the table from a catalog during resolution rules, just like it
does with the global catalog now.

8. ...
>
> Since the catalog API is not only developer facing, but also user-facing,
> I think it's better to have a doc explaining what the developers concern
> and what the end users concern. The doc is also good for future reference,
> and can be used in release notes.
>
If you think the SPIP that I posed to the list in April needs extra
information, please let me know.

rb
-- 
Ryan Blue
Software Engineer
Netflix


Re: DataSourceV2 community sync #3

2018-11-29 Thread Ryan Blue
Xiao,

For the questions in this last email about how catalogs interact and how
functions and other future features work: we discussed those last night. As
I said then, I think that the right approach is incremental. We don’t want
to design all of that in one gigantic proposal up front. To do that is to
put ourselves into analysis paralysis.

We don’t have a design for how catalogs interact with one another, but I
think we made a strong case for two points: first, that the proposed
structure doesn’t preclude any of those future decisions (hence we should
proceed incrementally). Second, that those situations aren’t that hard to
think through if you’re concerned about them: functions that can run in
Spark can be run on any data, functions that run in external sources cannot
be run on any data.

You’re right that I haven’t completely covered your *new* questions. But to
the questions in your first email:

   - You asked how, for example, Glue may be plugged in. That is well
   covered in the PR that adds catalogs as a plugin
   <https://github.com/apache/spark/pull/21306#issue-187572913>, the
   response I sent to Wenchen’s questions, and the earlier discussion thread I
   posted to this list with the subject “[DISCUSS] Multiple catalog support”.
   The short answer is that implementations are configured with Spark config
   properties and loaded with reflection.
   - You asked how users implement an external catalog without adding new
   data sources. That’s also covered in the “Multiple catalog support”
   proposal, the table catalog PR, and ongoing discussions on the v2 redesign.
   The answer is that a catalog returns a table instance that implements the
   various interfaces from Wenchen’s work. A table may implement them directly
   or return other existing implementations. Here’s how it worked in the
   old API
   
<https://github.com/apache/spark/pull/21306/files#diff-db51e7934b9ee539ad599197a935cb86R35>
   .

I hope that you don’t think I expect you to go “without seeing the design”!

rb

On Thu, Nov 29, 2018 at 3:17 PM Xiao Li  wrote:

> Ryan,
>
> All the proposal I read is only related to Table metadata. Catalog
> contains the metadata of database, functions, columns, views, and so on.
> When we have multiple catalogs, how these catalogs interact with each
> other? How the global catalog works? How a view, table, function, database
> and column is resolved? Do we have nickname, mapping, wrapper?
>
> Or I might miss the design docs you send? Could you post the doc?
>
> Thanks,
>
> Xiao
>
>
>
>
> Ryan Blue  于2018年11月29日周四 下午3:06写道:
>
>> Xiao,
>>
>> Please have a look at the pull requests and documents I've posted over
>> the last few months.
>>
>> If you still have questions about how you might plug in Glue, let me know
>> and I can clarify.
>>
>> rb
>>
>> On Thu, Nov 29, 2018 at 2:56 PM Xiao Li  wrote:
>>
>>> Ryan,
>>>
>>> Thanks for leading the discussion and sending out the memo!
>>>
>>>
>>>> Xiao suggested that there are restrictions for how tables and functions
>>>> interact. Because of this, he doesn’t think that separate TableCatalog and
>>>> FunctionCatalog APIs are feasible.
>>>
>>>
>>> Anything is possible. It depends on how we design the two interfaces.
>>> Now, most parts are unknown to me without seeing the design.
>>>
>>> I think we need to see the user stories, and high-level design before
>>> working on a small portion of Catalog federation. We do not need an
>>> exhaustive design in the current stage, but we need to know how the new
>>> proposal works. For example, how to plug in a new Hive metastore? How to
>>> plug in a Glue? How do users implement a new external catalog without
>>> adding any new data sources? Without knowing more details, it is hard to
>>> say whether this TableCatalog can satisfy all the requirements.
>>>
>>> Cheers,
>>>
>>> Xiao
>>>
>>>
>>> Ryan Blue  于2018年11月29日周四 下午2:32写道:
>>>
>>>> Hi everyone,
>>>>
>>>> Here are my notes from last night’s sync. Some attendees that joined
>>>> during discussion may be missing, since I made the list while we were
>>>> waiting for people to join.
>>>>
>>>> If you have topic suggestions for the next sync, please start sending
>>>> them to me. Thank you!
>>>>
>>>> *Attendees:*
>>>>
>>>> Ryan Blue
>>>> John Zhuge
>>>> Jamison Bennett
>>>> Yuanjian Li
>>>> Xiao Li
>>>> stczwd
>>>> Matt Cheah
>>>> Wenchen F

Public v2 interface location

2018-11-30 Thread Ryan Blue
Hi everyone,

In the DSv2 sync this week, we discussed adding a new SQL module, sql-api,
that would contain the interfaces for authors to plug in external sources.
The rationale for adding this package is that the common logical plans and
rules to validate those plans should live in Catalyst, but no classes in
catalyst are currently public for plugin authors to extend. Catalyst would
depend on the sql-api module to pull in the interfaces that plugin authors
implement.

I was just working on moving the proposed TableCatalog interface into a new
sql-api module, but I ran into a problem: the new APIs still need to
reference classes in Catalyst, like DataType/StructType, AnalysisException,
and Statistics.

I don’t think it makes sense to move all of the referenced classes into
sql-api as well, but I could be convinced otherwise. If we decide not to
move them, then that leaves us back where we started: we can either expose
the v2 API from the catalyst package, or we can keep the v2 API, logical
plans, and rules in core instead of catalyst.

Anyone want to weigh in with a preference for how to move forward?

rb
-- 
Ryan Blue
Software Engineer
Netflix


Re: DataSourceV2 community sync #3

2018-12-01 Thread Ryan Blue
Xiao,

I do have opinions about how multi-catalog support should work, but I don't
think we are at a point where there is consensus. That's why I've started
discussion threads and added the CatalogTableIdentifier PR instead of a
comprehensive design doc. You have opinions about how users should interact
with catalogs as well (your "federated catalog") and we should discuss our
options here.

But the crucial point is that the user interaction doesn't need to be
completely decided in order to move forward. A design for multi-catalog
support isn't what we need right now; we need an API that plugins can
implement to expose table operations.

I've proposed that API, TableCatalog, and a way to manage catalog plugins.
I've made an argument for why I think that API is flexible enough for the
task and still fairly simple.

I think that we can add TableCatalog now and work on multi-catalog support
incrementally, and I have yet to hear your argument for why that is not the
case.

rb

On Sat, Dec 1, 2018 at 12:36 PM Xiao Li  wrote:

> Hi, Ryan,
>
> Catalog is a really important component for Spark SQL or any analytics
> platform, I have to emphasize. Thus, a careful design is needed to ensure
> it works as expected. Based on my previous discussion with many community
> members, Spark SQL needs a catalog interface so that we can mount multiple
> external physical catalogs and they can be presented as a single logical
> catalog [which is a so-called global federated catalog]. In the future, we
> can use this interface to develop our own catalog (instead of Hive
> metastore) for more efficient metadata management. We can also plug in ACL
> management if needed.
>
> Based on your previous answers, it sounds like you have many ideas in your
> mind about building a Catalog interface for Spark SQL, but it is not shown
> in the design doc. Could you write them down in a single doc? We can try to
> leave comments in the design doc, instead of discussing various issues in
> PRs, emails and meetings. It can also help the whole community understand
> your proposal and post their comments.
>
> Thanks,
>
> Xiao
>
>
>
> Ryan Blue  于2018年11月29日周四 下午7:06写道:
>
>> Xiao,
>>
>> For the questions in this last email about how catalogs interact and how
>> functions and other future features work: we discussed those last night. As
>> I said then, I think that the right approach is incremental. We don’t want
>> to design all of that in one gigantic proposal up front. To do that is to
>> put ourselves into analysis paralysis.
>>
>> We don’t have a design for how catalogs interact with one another, but I
>> think we made a strong case for two points: first, that the proposed
>> structure doesn’t preclude any of those future decisions (hence we should
>> proceed incrementally). Second, that those situations aren’t that hard to
>> think through if you’re concerned about them: functions that can run in
>> Spark can be run on any data, functions that run in external sources cannot
>> be run on any data.
>>
>> You’re right that I haven’t completely covered your *new* questions. But
>> to the questions in your first email:
>>
>>- You asked how, for example, Glue may be plugged in. That is well
>>covered in the PR that adds catalogs as a plugin
>><https://github.com/apache/spark/pull/21306#issue-187572913>, the
>>response I sent to Wenchen’s questions, and the earlier discussion thread 
>> I
>>posted to this list with the subject “[DISCUSS] Multiple catalog support”.
>>The short answer is that implementations are configured with Spark config
>>properties and loaded with reflection.
>>- You asked how users implement an external catalog without adding
>>new data sources. That’s also covered in the “Multiple catalog support”
>>proposal, the table catalog PR, and ongoing discussions on the v2 
>> redesign.
>>The answer is that a catalog returns a table instance that implements the
>>various interfaces from Wenchen’s work. A table may implement them 
>> directly
>>or return other existing implementations. Here’s how it worked in the
>>old API
>>
>> <https://github.com/apache/spark/pull/21306/files#diff-db51e7934b9ee539ad599197a935cb86R35>
>>.
>>
>> I hope that you don’t think I expect you to go “without seeing the
>> design”!
>>
>> rb
>>
>> On Thu, Nov 29, 2018 at 3:17 PM Xiao Li  wrote:
>>
>>> Ryan,
>>>
>>> All the proposal I read is only related to Table metadata. Catalog
>>> contains the metadata of database, functions, columns, views, an

Re: Public v2 interface location

2018-12-01 Thread Ryan Blue
Jackey,

The proposal to add a sql-api module was based on the need to have the SQL
API classes, like `Table` available to Catalyst so we can have logical
plans and analyzer rules in that module. But, nothing in Catalyst is public
and so it doesn't contain user-implemented APIs. There are 3 options to
solve that problem:

1. Add a module that catalyst depends on with the APIs, sql-api. But I ran
into the problem I described above: needing to depend on Catalyst classes.
2. Add the API to catalyst. The problem is adding publicly available API
classes to a previously non-public module.
3. Add the API to core. The problem here is that it is more difficult to
keep rules and logical plans in catalyst, where I would expect them to be.

I'm not sure which option is the right one, but I no longer think that
option #1 is very promising.

On Fri, Nov 30, 2018 at 10:47 PM JackyLee  wrote:

> Hi, Ryan Blue.
>
> I don't think it would be a good idea to add the sql-api module.
> I prefer to add sql-api to sql/core. The sql is just another representation
> of dataset, thus there is no need to add new module to do this. Besides, it
> would be easier to add sql-api in core.
>
> By the way, I don't think it's a good time to add sql api, we have not yet
> determined many details of the DataSource V2 API.
>
>
>
> --
> Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>

-- 
Ryan Blue
Software Engineer
Netflix


Re: DataSourceV2 community sync #3

2018-12-01 Thread Ryan Blue
I try to avoid discussing each specific topic about the catalog federation
before we deciding the framework of multi-catalog supports.

I’ve tried to open discussions on this for the last 6+ months because we
need it. I understand that you’d like a comprehensive plan for supporting
more than one catalog before moving forward, but I think most of us are
okay with the incremental approach. It’s better to make progress toward the
goal.

In general, data source API V2 and catalog API should be orthogonal
I agree with you, and they are. The API that Wenchen is working on for
reading and writing data and the TableCatalog API are orthogonal efforts.
As I said, they only overlap with the Table interface, and clearly tables
loaded from a catalog need to be able to plug into the read/write API.

The reason these two efforts are related is that the community voted
to standardize
logical plans
<https://docs.google.com/document/d/1gYm5Ji2Mge3QBdOliFV5gSPTKlX4q1DCBXIkiyMv62A/edit?ts=5a987801#heading=h.m45webtwxf2d>.
Those standard plans have well-defined behavior for operations like CTAS,
instead of relying on the data source plugin to do … something undefined.
To implement this, we need a way for Spark to create tables, drop tables,
etc. That’s why we need a way for sources to plug in Table-related catalog
operations. (Sorry if this was already clear. I know I talked about it at
length in the first v2 sync up.)

While the two APIs are orthogonal and serve different purposes,
implementing common operations requires that we have both.

I would not call it a table catalog. I do not expect the data source
should/need to implement a catalog. Since you might want an atomic CTAS, we
can improve the table metadata resolution logic to support it with
different resolution priorities. For example, try to get the metadata from
the external data source, if the table metadata is not available in the
catalog.

It sounds like your definition of a “catalog” is different. I think you’re
referring to a global catalog? Could you explain what you’re talking about
here?

I’m talking about an API to interface with an external data source, which I
think we need for the reasons I outlined above. I don’t care what we call
it, but your comment seems to hint that there would be an API to look up
tables in external sources. That’s the thing I’m talking about.

CatalogTableIdentifier: The PR is doing nothing but adding an interface.

Yes. I opened this PR to discuss how Spark should track tables from
different catalogs and avoid passing those references to code paths that
don’t support them. The use of table identifiers with a catalog part was
discussed in the “Multiple catalog support” thread. I’ve also brought it up
and pointed out how I think it should be used in syncs a couple of times.

Sorry if this discussion isn’t how you would have done it, but it’s a
fairly simple idea that I don’t think requires its own doc.

On Sat, Dec 1, 2018 at 5:12 PM Xiao Li  wrote:

> Hi, Ryan,
>
> I try to avoid discussing each specific topic about the catalog federation
> before we deciding the framework of multi-catalog supports.
>
> -  *CatalogTableIdentifier*: The PR
> https://github.com/apache/spark/pull/21978 is doing nothing but adding an
> interface. In the PR, we did not discuss how to resolve it, any restriction
> on the naming and what is a catalog.This requires more doc for explaining
> it. For example,
> https://docs.microsoft.com/en-us/sql/t-sql/language-elements/transact-sql-syntax-conventions-transact-sql?view=sql-server-2017
> Normally, we do not merge a PR without showing how to use it.
>
> - *TableCatalog*: First, I would not call it a table catalog. I do not
> expect the data source should/need to implement a catalog. Since you might
> want an atomic CTAS, we can improve the table metadata resolution logic to
> support it with different resolution priorities. For example, try to get
> the metadata from the external data source, if the table metadata is not
> available in the catalog. However, the catalog should do what the catalog
> is expected to do. If we follow what our data source API V2 is doing,
> basically, the data source is just a table. It is not related to database,
> view, or function. Mixing catalog with data source API V2 just makes the
> whole things more complex.
>
> In general, data source API V2 and catalog API should be orthogonal. I
> believe the data source API V2 and catalog APIs are two separate projects.
> Hopefully, you understand my concern. If we really want to mix them
> together, I want to read the design of your multi-catalog support and
> understand more details.
>
> Thanks,
>
> Xiao
>
>
>
>
> Ryan Blue  于2018年12月1日周六 下午3:22写道:
>
>> Xiao,
>>
>> I do have opinions about how multi-catalog support should work, but I
>> don't think we are at a point where ther

Re: DataSourceV2 community sync #3

2018-12-03 Thread Ryan Blue
gt; sources are for table only.
>
> These data sources should not use the Catalog identifier to identify. That
> means, in "catalog.database.table", catalog is only used to identify the
> actual catalog instead of data sources.
>
> For a Spark cluster, we could mount multiple catalogs (e.g.,
> hive_metastore_1, hive_metastore_2 and glue_1) at the same time. We could
> get the metadata of the tables, database, functions by accessing different
> catalog: "hive_metastore_1.db1.tab1", "hive_metastore_2.db2.tab2",
> "glue.db3.tab2". In the future, if Spark has its own catalog
> implementation, we might have something like, "spark_catalog1.db3.tab2".
> The catalog will be used for registering all the external data sources,
> various Spark UDFs and so on.
>
> At the same time, we should NOT mix the table-level data sources with
> catalog support. That means, "Cassandra1.db1.tab1", "Kafka2.db2.tab1",
> "Hbase3.db1.tab2" will not appear.
>
> Do you agree on my definition of catalog in Spark SQL?
>
> Xiao
>
>
> Ryan Blue  于2018年12月1日周六 下午7:25写道:
>
>> I try to avoid discussing each specific topic about the catalog
>> federation before we deciding the framework of multi-catalog supports.
>>
>> I’ve tried to open discussions on this for the last 6+ months because we
>> need it. I understand that you’d like a comprehensive plan for supporting
>> more than one catalog before moving forward, but I think most of us are
>> okay with the incremental approach. It’s better to make progress toward the
>> goal.
>>
>> In general, data source API V2 and catalog API should be orthogonal
>> I agree with you, and they are. The API that Wenchen is working on for
>> reading and writing data and the TableCatalog API are orthogonal efforts.
>> As I said, they only overlap with the Table interface, and clearly tables
>> loaded from a catalog need to be able to plug into the read/write API.
>>
>> The reason these two efforts are related is that the community voted to 
>> standardize
>> logical plans
>> <https://docs.google.com/document/d/1gYm5Ji2Mge3QBdOliFV5gSPTKlX4q1DCBXIkiyMv62A/edit?ts=5a987801#heading=h.m45webtwxf2d>.
>> Those standard plans have well-defined behavior for operations like CTAS,
>> instead of relying on the data source plugin to do … something undefined.
>> To implement this, we need a way for Spark to create tables, drop tables,
>> etc. That’s why we need a way for sources to plug in Table-related catalog
>> operations. (Sorry if this was already clear. I know I talked about it at
>> length in the first v2 sync up.)
>>
>> While the two APIs are orthogonal and serve different purposes,
>> implementing common operations requires that we have both.
>>
>> I would not call it a table catalog. I do not expect the data source
>> should/need to implement a catalog. Since you might want an atomic CTAS, we
>> can improve the table metadata resolution logic to support it with
>> different resolution priorities. For example, try to get the metadata from
>> the external data source, if the table metadata is not available in the
>> catalog.
>>
>> It sounds like your definition of a “catalog” is different. I think
>> you’re referring to a global catalog? Could you explain what you’re talking
>> about here?
>>
>> I’m talking about an API to interface with an external data source, which
>> I think we need for the reasons I outlined above. I don’t care what we call
>> it, but your comment seems to hint that there would be an API to look up
>> tables in external sources. That’s the thing I’m talking about.
>>
>> CatalogTableIdentifier: The PR is doing nothing but adding an interface.
>>
>> Yes. I opened this PR to discuss how Spark should track tables from
>> different catalogs and avoid passing those references to code paths that
>> don’t support them. The use of table identifiers with a catalog part was
>> discussed in the “Multiple catalog support” thread. I’ve also brought it up
>> and pointed out how I think it should be used in syncs a couple of times.
>>
>> Sorry if this discussion isn’t how you would have done it, but it’s a
>> fairly simple idea that I don’t think requires its own doc.
>>
>> On Sat, Dec 1, 2018 at 5:12 PM Xiao Li  wrote:
>>
>>> Hi, Ryan,
>>>
>>> I try to avoid discussing each specific topic about the catalog
>>> federation before we deciding the framework of multi-catalog supports.
>>>
>>> -  *CatalogTableIdentifier*: The PR
>>> https://github.com/ap

Re: DataSourceV2 community sync #3

2018-12-03 Thread Ryan Blue
Jayesh,

The current catalog in Spark is a little weird. It uses a Hive catalog and
adds metadata that only Spark understands to track tables, in addition to
regular Hive tables. Some of those tables are actually just pointers to
tables that exist in some other source of truth. This is what makes the
default implementation “generic”.

The reason why Spark is this way is that it only supports one global
catalog. I want Spark to be able to use multiple catalogs, so I can plug in
a catalog implementation that talks directly to the source of truth for
another system, like Cassandra or JDBC. But Cassandra tracks Cassandra
tables and wouldn’t be a generic catalog.

I also think that the easiest way to expose multiple catalogs is to simply
let users specify which one they want to interact with:

SELECT * FROM jdbc_test.db.table;

USE CATALOG cassandra_prod;
SELECT * FROM some_c_table;

The public-facing user interaction part is still open for discussion and I
think Xiao has a different opinion.

The work I’ve posed solves two different problems:

   1. How should Spark interact with catalog implementations? For tables,
   I’ve proposed the TableCatalog API and I wrote an SPIP explaining why.
   2. How should Spark internally track tables in different catalogs? For
   this, I’ve proposed a CatalogTableIdentifier and outlined how to keep these
   out of paths that don’t expect tables in other catalogs.

Spark catalog be the common denominator of the other catalogs (least
featured) or a super-feature catalog?

I think anything we want to expose in Spark would need to be in an API
implemented by catalogs. I prefer an incremental approach here: as we build
things that Spark can take advantage of, we can add them to the API that
catalog implementations provide.

On Sat, Dec 1, 2018 at 9:10 PM Thakrar, Jayesh jthak...@conversantmedia.com
<http://mailto:jthak...@conversantmedia.com> wrote:

Just curious on the need for a catalog within Spark.
>
>
>
> So Spark interface with other systems – many of which have a catalog of
> their own – e.g. RDBMSes, HBase, Cassandra, etc. and some don’t (e.g. HDFS,
> filesyststem, etc).
>
> So what is the purpose of having this catalog within Spark for tables
> defined in Spark (which could be a front for other “catalogs”)?
>
> Is it trying to fulfill some void/need…..
>
> Also, would the Spark catalog be the common denominator of the other
> catalogs (least featured) or a super-feature catalog?
>
>
>
> *From: *Xiao Li 
> *Date: *Saturday, December 1, 2018 at 10:49 PM
> *To: *Ryan Blue 
> *Cc: *"u...@spark.apache.org" 
> *Subject: *Re: DataSourceV2 community sync #3
>
>
>
> Hi, Ryan,
>
>
>
> Let us first focus on answering the most fundamental problem before
> discussing various related topics. What is a catalog in Spark SQL?
>
>
>
> My definition of catalog is based on the database catalog. Basically, the
> catalog provides a service that manage the metadata/definitions of database
> objects (e.g., database, views, tables, functions, user roles, and so on).
>
>
>
> In Spark SQL, all the external objects accessed through our data source
> APIs are called "tables". I do not think we will expand the support in the
> near future. That means, the metadata we need from the external data
> sources are for table only.
>
>
>
> These data sources should not use the Catalog identifier to identify. That
> means, in "catalog.database.table", catalog is only used to identify the
> actual catalog instead of data sources.
>
>
>
> For a Spark cluster, we could mount multiple catalogs (e.g.,
> hive_metastore_1, hive_metastore_2 and glue_1) at the same time. We could
> get the metadata of the tables, database, functions by accessing different
> catalog: "hive_metastore_1.db1.tab1", "hive_metastore_2.db2.tab2",
> "glue.db3.tab2". In the future, if Spark has its own catalog
> implementation, we might have something like, "spark_catalog1.db3.tab2".
> The catalog will be used for registering all the external data sources,
> various Spark UDFs and so on.
>
>
>
> At the same time, we should NOT mix the table-level data sources with
> catalog support. That means, "Cassandra1.db1.tab1", "Kafka2.db2.tab1",
> "Hbase3.db1.tab2" will not appear.
>
>
>
> Do you agree on my definition of catalog in Spark SQL?
>
>
>
> Xiao
>
>
>
>
>
> Ryan Blue  于2018年12月1日周六 下午7:25写道:
>
> I try to avoid discussing each specific topic about the catalog federation
> before we deciding the framework of multi-catalog supports.
>
> I’ve tried to open discussions on this for the last 6+ months because we
> need it. I understand that you’d like a comprehensive plan for supp

Re: DataSourceV2 community sync #3

2018-12-03 Thread Ryan Blue
Jayesh,

I don’t think this need is very narrow.

To have reliable behavior for CTAS, you need to:

   1. Check whether a table exists and fail. Right now, it is up to the
   source whether to continue with the write if the table already exists or to
   throw an exception, which is unreliable across sources.
   2. Create a table if it doesn’t exist.
   3. Drop the table if writing failed. In the current implementation, this
   can’t be done reliably because #1 is unreliable. So a failed CTAS has a
   side-effect that the table is created in some cases and a subsequent retry
   can fail because the table exists.

Leaving these operations up to the read/write API is why behavior isn’t
consistent today. It also increases the amount of work that a source needs
to do and mixes concerns (what to do in a write when the table doesn’t
exist). Spark is going to be a lot more predictable if we decompose the
behavior of these operations into create, drop, write, etc.

And in addition to CTAS, we want these operations to be exposed for
sources. If Spark can create a table, why wouldn’t you be able to run DROP
TABLE to remove it?

Last, Spark must be able to interact with the source of truth for tables.
If Spark can’t create a table in Cassandra, it should reject a CTAS
operation.

On Mon, Dec 3, 2018 at 9:52 AM Thakrar, Jayesh 
wrote:

> Thank you Xiao – I was wondering what was the motivation for the catalog.
>
> If CTAS is the only candidate, would it suffice to have that as part of
> the data source interface only?
>
>
>
> If we look at BI, ETL and reporting tools which interface with many tables
> from different data sources at the same time, it makes sense to have a
> metadata catalog as the catalog is used to “design” the work for that tool
> (e.g. ETL processing unit, etc). Furthermore, the catalog serves as a data
> mapping to map external data types to the tool’s data types.
>
>
>
> Is the vision to move in that direction for Spark with the catalog
> support/feature?
>
> Also, is the vision to also incorporate the “options” specified for the
> data source into the catalog too?
>
> That may be helpful in some situations (e.g. the JDBC connect string being
> available from the catalog).
>
> *From: *Xiao Li 
> *Date: *Monday, December 3, 2018 at 10:44 AM
> *To: *"Thakrar, Jayesh" 
> *Cc: *Ryan Blue , "u...@spark.apache.org" <
> dev@spark.apache.org>
> *Subject: *Re: DataSourceV2 community sync #3
>
>
>
> Hi, Jayesh,
>
>
>
> This is a good question. Spark is a unified analytics engine for various
> data sources. We are able to get the table schema from the underlying data
> sources via our data source APIs. Thus, it resolves most of the user
> requirements. Spark does not need the other info (like database, function,
> and views) that are stored in the local catalog. Note, Spark is not a query
> engine for a specific data source. Thus, we did not accept any public API
> that does not have an implementation in the past. I believe this still
> holds.
>
>
>
> The catalog is part of the Spark SQL in the initial design and
> implementation. For the data sources that do not have catalog, they can use
> our catalog as a single source of truth. If they already have their own
> catalog, normally, they use the underlying data sources as the single
> source of truth. The table metadata in the Spark catalog is kind of a view
> of their physical schema that are stored in their local catalog. To support
> an atomic CREATE TABLE AS SELECT that requires modifying the catalog and
> data, we can add an interface for data sources but that is not part of
> catalog interface. The CTAS will not bypass our catalog. We will still
> register it in our catalog and the schema may or may not be stored in our
> catalog.
>
>
>
> Will we define a super-feature catalog that can support all the data
> sources?
>
>
>
> Based on my understanding, it is very hard. The priority is low based on
> our current scope of Spark SQL. If you want to do it, your design needs to
> consider how it works between global and local catalogs. This also requires
> a SPIP and voting. If you want to develop it incrementally without a
> design, I would suggest you to do it in your own fork. In the past, Spark
> on K8S was developed in a separate fork and then merged to the upstream of
> Apache Spark.
>
>
>
> Welcome your contributions and let us make Spark great!
>
>
>
> Cheers,
>
>
>
> Xiao
>
>
>
> Thakrar, Jayesh  于2018年12月1日周六 下午9:10写道:
>
> Just curious on the need for a catalog within Spark.
>
>
>
> So Spark interface with other systems – many of which have a catalog of
> their own – e.g. RDBMSes, HBase, Cassandra, etc. and some don’t (e.g. HDFS,
&g

Re: [SPARK-26160] Make assertNotBucketed call in DataFrameWriter::save optional

2018-12-10 Thread Ryan Blue
ed in this transmission is privileged and
> confidential information intended only for the use of the individual or
> entity named above. If the reader of this message is not the intended
> recipient, you are hereby notified that any dissemination, distribution or
> copying of this communication is strictly prohibited. If you have received
> this transmission in error, do not read it. Please immediately reply to the
> sender that you have received this communication in error and then delete
> it.
>
> Esta mensagem e seus anexos se dirigem exclusivamente ao seu destinatário,
> pode conter informação privilegiada ou confidencial e é para uso exclusivo
> da pessoa ou entidade de destino. Se não é vossa senhoria o destinatário
> indicado, fica notificado de que a leitura, utilização, divulgação e/ou
> cópia sem autorização pode estar proibida em virtude da legislação vigente.
> Se recebeu esta mensagem por erro, rogamos-lhe que nos o comunique
> imediatamente por esta mesma via e proceda a sua destruição
>


-- 
Ryan Blue
Software Engineer
Netflix


Re: Pushdown in DataSourceV2 question

2018-12-11 Thread Ryan Blue
> Il giorno Sab 8 Dic 2018, 12:32 Jörn Franke  ha
>>>> scritto:
>>>>
>>>>> BTW. Even for json a pushdown can make sense to avoid that data is
>>>>> unnecessary ending in Spark ( because it would cause unnecessary 
>>>>> overhead).
>>>>> In the datasource v2 api you need to implement a SupportsPushDownFilter
>>>>>
>>>>> > Am 08.12.2018 um 10:50 schrieb Noritaka Sekiyama <
>>>>> moomind...@gmail.com>:
>>>>> >
>>>>> > Hi,
>>>>> >
>>>>> > I'm a support engineer, interested in DataSourceV2.
>>>>> >
>>>>> > Recently I had some pain to troubleshoot to check if pushdown is
>>>>> actually applied or not.
>>>>> > I noticed that DataFrame's explain() method shows pushdown even for
>>>>> JSON.
>>>>> > It totally depends on DataSource side, I believe. However, I would
>>>>> like Spark to have some way to confirm whether specific pushdown is
>>>>> actually applied in DataSource or not.
>>>>> >
>>>>> > # Example
>>>>> > val df = spark.read.json("s3://sample_bucket/people.json")
>>>>> > df.printSchema()
>>>>> > df.filter($"age" > 20).explain()
>>>>> >
>>>>> > root
>>>>> >  |-- age: long (nullable = true)
>>>>> >  |-- name: string (nullable = true)
>>>>> >
>>>>> > == Physical Plan ==
>>>>> > *Project [age#47L, name#48]
>>>>> > +- *Filter (isnotnull(age#47L) && (age#47L > 20))
>>>>> >+- *FileScan json [age#47L,name#48] Batched: false, Format: JSON,
>>>>> Location: InMemoryFileIndex[s3://sample_bucket/people.json],
>>>>> PartitionFilters: [], PushedFilters: [IsNotNull(age), 
>>>>> GreaterThan(age,20)],
>>>>> ReadSchema: struct
>>>>> >
>>>>> > # Comments
>>>>> > As you can see, PushedFilter is shown even if input data is JSON.
>>>>> > Actually this pushdown is not used.
>>>>> >
>>>>> > I'm wondering if it has been already discussed or not.
>>>>> > If not, this is a chance to have such feature in DataSourceV2
>>>>> because it would require some API level changes.
>>>>> >
>>>>> >
>>>>> > Warm regards,
>>>>> >
>>>>> > Noritaka Sekiyama
>>>>> >
>>>>>
>>>>> -
>>>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>>>>
>>>>>

-- 
Ryan Blue
Software Engineer
Netflix


Re: Self join

2018-12-11 Thread Ryan Blue
Marco,

Thanks for starting the discussion! I think it would be great to have a
clear description of the problem and a proposed solution. Do you have
anything like that? It would help bring the rest of us up to speed without
reading different pull requests.

Thanks!

rb

On Tue, Dec 11, 2018 at 3:54 AM Marco Gaido  wrote:

> Hi all,
>
> I'd like to bring to the attention of a more people a problem which has
> been there for long, ie, self joins. Currently, we have many troubles with
> them. This has been reported several times to the community and seems to
> affect many people, but as of now no solution has been accepted for it.
>
> I created a PR some time ago in order to address the problem (
> https://github.com/apache/spark/pull/21449), but Wenchen mentioned he
> tried to fix this problem too but so far no attempt was successful because
> there is no clear semantic (
> https://github.com/apache/spark/pull/21449#issuecomment-393554552).
>
> So I'd like to propose to discuss here which is the best approach for
> tackling this issue, which I think would be great to fix for 3.0.0, so if
> we decide to introduce breaking changes in the design, we can do that.
>
> Thoughts on this?
>
> Thanks,
> Marco
>


-- 
Ryan Blue
Software Engineer
Netflix


Re: Self join

2018-12-12 Thread Ryan Blue
Marco,

I'm actually asking for a design doc that clearly states the problem and
proposes a solution. This is a substantial change and probably should be an
SPIP.

I think that would be more likely to generate discussion than referring to
PRs or a quick paragraph on the dev list, because the only people that are
looking at it now are the ones already familiar with the problem.

rb

On Wed, Dec 12, 2018 at 2:05 AM Marco Gaido  wrote:

> Thank you all for your answers.
>
> @Ryan Blue  sure, let me state the problem more
> clearly: imagine you have 2 dataframes with a common lineage (for instance
> one is derived from the other by some filtering or anything you prefer).
> And imagine you want to join these 2 dataframes. Currently, there is a fix
> by Reynold which deduplicates the join condition in case the condition is
> an equality one (please notice that in this case, it doesn't matter which
> one is on the left and which one on the right). But if the condition
> involves other comparisons, such as a ">" or a "<", this would result in an
> analysis error, because the attributes on both sides are the same (eg. you
> have the same id#3 attribute on both sides), and you cannot deduplicate
> them blindly as which one is on a specific side matters.
>
> @Reynold Xin  my proposal was to add a dataset id in
> the metadata of each attribute, so that in this case we can distinguish
> from which dataframe the attribute is coming from, ie. having the
> DataFrames `df1` and `df2` where `df2` is derived from `df1`,
> `df1.join(df2, df1("a") > df2("a"))` could be resolved because we would
> know that the first attribute is taken from `df1` and so it has to be
> resolved using it and the same for the other. But I am open to any approach
> to this problem, if other people have better ideas/suggestions.
>
> Thanks,
> Marco
>
> Il giorno mar 11 dic 2018 alle ore 18:31 Jörn Franke 
> ha scritto:
>
>> I don’t know your exact underlying business problem,  but maybe a graph
>> solution, such as Spark Graphx meets better your requirements. Usually
>> self-joins are done to address some kind of graph problem (even if you
>> would not describe it as such) and is for these kind of problems much more
>> efficient.
>>
>> Am 11.12.2018 um 12:44 schrieb Marco Gaido :
>>
>> Hi all,
>>
>> I'd like to bring to the attention of a more people a problem which has
>> been there for long, ie, self joins. Currently, we have many troubles with
>> them. This has been reported several times to the community and seems to
>> affect many people, but as of now no solution has been accepted for it.
>>
>> I created a PR some time ago in order to address the problem (
>> https://github.com/apache/spark/pull/21449), but Wenchen mentioned he
>> tried to fix this problem too but so far no attempt was successful because
>> there is no clear semantic (
>> https://github.com/apache/spark/pull/21449#issuecomment-393554552).
>>
>> So I'd like to propose to discuss here which is the best approach for
>> tackling this issue, which I think would be great to fix for 3.0.0, so if
>> we decide to introduce breaking changes in the design, we can do that.
>>
>> Thoughts on this?
>>
>> Thanks,
>> Marco
>>
>>

-- 
Ryan Blue
Software Engineer
Netflix


Re: dsv2 remaining work

2018-12-13 Thread Ryan Blue
We discussed this issue in the sync. I'll be sending out a summary later
today, but we came to a conclusion on some of these.

For #1, there are 2 parts: the design and the implementation. We agreed
that the design should not include SaveMode. The implementation may include
SaveMode until we can replace it with Overwrite, #2. We decided to create a
release-blocking issue to remove SaveMode so we will not include the
redesign to DataSourceV2 in a release unless SaveMode has been removed from
the read/write API (not the public API).

Let's continue discussions on #3. I don't think removing SaveMode needs to
be blocked by this because the justification for keeping SaveMode was to
not break existing tests. Existing tests only rely on overwrite. I agree
that CTAS is important and I'd prefer to get that in before a release as
well, though we didn't talk about that.

rb

On Wed, Dec 12, 2018 at 4:58 PM Reynold Xin  wrote:

> Unfortunately I can't make it to the DSv2 sync today. Sending an email
> with my thoughts instead. I spent a few hours thinking about this. It's
> evident that progress has been slow, because this is an important API and
> people from different perspectives have very different requirements, and
> the priorities are weighted very differently (e.g. issues that are super
> important to one might be not as important to another, and people just talk
> past each other arguing why one ignored a broader issue in a PR or
> proposal).
>
> I think the only real way to make progress is to decouple the efforts into
> major areas, and make progress somewhat independently. Of course, some care
> is needed to take care of
>
> Here's one attempt at listing some of the remaining big rocks:
>
> 1. Basic write API -- with the current SaveMode.
>
> 2. Add Overwrite (or Replace) logical plan, and the associated API in
> Table.
>
> 3. Add APIs for per-table metadata operations (note that I'm not calling
> it a catalog API here). Create/drop/alter table goes here. We also need to
> figure out how to do this for the file system sources in which there is no
> underlying catalog. One idea is to treat the file system as a catalog (with
> arbitrary levels of databases). To do that, it'd be great if the identifier
> for a table is not a fixed 2 or 3 part name, but just a string array.
>
> 4. Remove SaveMode. This is blocked on at least 1 + 2, and potentially 3.
>
> 5. Design a stable, fast, smaller surface row format to replace the
> existing InternalRow (and all the internal data types), which is internal
> and unstable. This can be further decoupled into the design for each data
> type.
>
> The above are the big one I can think of. I probably missed some, but a
> lot of other smaller things can be improved on later.
>
>
>
>
>
>
>

-- 
Ryan Blue
Software Engineer
Netflix


Re: Self join

2018-12-13 Thread Ryan Blue
Thanks for the extra context, Marco. I thought you were trying to propose a
solution.

On Thu, Dec 13, 2018 at 2:45 AM Marco Gaido  wrote:

> Hi Ryan,
>
> My goal with this email thread is to discuss with the community if there
> are better ideas (as I was told many other people tried to address this).
> I'd consider this as a brainstorming email thread. Once we have a good
> proposal, then we can go ahead with a SPIP.
>
> Thanks,
> Marco
>
> Il giorno mer 12 dic 2018 alle ore 19:13 Ryan Blue  ha
> scritto:
>
>> Marco,
>>
>> I'm actually asking for a design doc that clearly states the problem and
>> proposes a solution. This is a substantial change and probably should be an
>> SPIP.
>>
>> I think that would be more likely to generate discussion than referring
>> to PRs or a quick paragraph on the dev list, because the only people that
>> are looking at it now are the ones already familiar with the problem.
>>
>> rb
>>
>> On Wed, Dec 12, 2018 at 2:05 AM Marco Gaido 
>> wrote:
>>
>>> Thank you all for your answers.
>>>
>>> @Ryan Blue  sure, let me state the problem more
>>> clearly: imagine you have 2 dataframes with a common lineage (for instance
>>> one is derived from the other by some filtering or anything you prefer).
>>> And imagine you want to join these 2 dataframes. Currently, there is a fix
>>> by Reynold which deduplicates the join condition in case the condition is
>>> an equality one (please notice that in this case, it doesn't matter which
>>> one is on the left and which one on the right). But if the condition
>>> involves other comparisons, such as a ">" or a "<", this would result in an
>>> analysis error, because the attributes on both sides are the same (eg. you
>>> have the same id#3 attribute on both sides), and you cannot deduplicate
>>> them blindly as which one is on a specific side matters.
>>>
>>> @Reynold Xin  my proposal was to add a dataset id
>>> in the metadata of each attribute, so that in this case we can distinguish
>>> from which dataframe the attribute is coming from, ie. having the
>>> DataFrames `df1` and `df2` where `df2` is derived from `df1`,
>>> `df1.join(df2, df1("a") > df2("a"))` could be resolved because we would
>>> know that the first attribute is taken from `df1` and so it has to be
>>> resolved using it and the same for the other. But I am open to any approach
>>> to this problem, if other people have better ideas/suggestions.
>>>
>>> Thanks,
>>> Marco
>>>
>>> Il giorno mar 11 dic 2018 alle ore 18:31 Jörn Franke <
>>> jornfra...@gmail.com> ha scritto:
>>>
>>>> I don’t know your exact underlying business problem,  but maybe a graph
>>>> solution, such as Spark Graphx meets better your requirements. Usually
>>>> self-joins are done to address some kind of graph problem (even if you
>>>> would not describe it as such) and is for these kind of problems much more
>>>> efficient.
>>>>
>>>> Am 11.12.2018 um 12:44 schrieb Marco Gaido :
>>>>
>>>> Hi all,
>>>>
>>>> I'd like to bring to the attention of a more people a problem which has
>>>> been there for long, ie, self joins. Currently, we have many troubles with
>>>> them. This has been reported several times to the community and seems to
>>>> affect many people, but as of now no solution has been accepted for it.
>>>>
>>>> I created a PR some time ago in order to address the problem (
>>>> https://github.com/apache/spark/pull/21449), but Wenchen mentioned he
>>>> tried to fix this problem too but so far no attempt was successful because
>>>> there is no clear semantic (
>>>> https://github.com/apache/spark/pull/21449#issuecomment-393554552).
>>>>
>>>> So I'd like to propose to discuss here which is the best approach for
>>>> tackling this issue, which I think would be great to fix for 3.0.0, so if
>>>> we decide to introduce breaking changes in the design, we can do that.
>>>>
>>>> Thoughts on this?
>>>>
>>>> Thanks,
>>>> Marco
>>>>
>>>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>

-- 
Ryan Blue
Software Engineer
Netflix


[DISCUSS] Function plugins

2018-12-14 Thread Ryan Blue
Hi everyone,
I’ve been looking into improving how users of our Spark platform register
and use UDFs and I’d like to discuss a few ideas for making this easier.

The motivation for this is the use case of defining a UDF from SparkSQL or
PySpark. We want to make it easy to write JVM UDFs and use them from both
SQL and Python. Python UDFs work great in most cases, but we occasionally
don’t want to pay the cost of shipping data to python and processing it
there so we want to make it easy to register UDFs that will run in the JVM.

There is already syntax to create a function from a JVM class
<https://docs.databricks.com/spark/latest/spark-sql/language-manual/create-function.html>
in SQL that would work, but this option requires using the Hive UDF API
instead of Spark’s simpler Scala API. It also requires argument translation
and doesn’t support codegen. Beyond the problem of the API and performance,
it is annoying to require registering every function individually with a CREATE
FUNCTION statement.

The alternative that I’d like to propose is to add a way to register a
named group of functions using the proposed catalog plugin API.

For anyone unfamiliar with the proposed catalog plugins, the basic idea is
to load and configure plugins using a simple property-based scheme. Those
plugins expose functionality through mix-in interfaces, like TableCatalog
to create/drop/load/alter tables. Another interface could be UDFCatalog
that can load UDFs.

interface UDFCatalog extends CatalogPlugin {
  UserDefinedFunction loadUDF(String name)
}

To use this, I would create a UDFCatalog class that returns my Scala
functions as UDFs. To look up functions, we would use both the catalog name
and the function name.

This would allow my users to write Scala UDF instances, package them using
a UDFCatalog class (provided by me), and easily use them in Spark with a
few configuration options to add the catalog in their environment.

This would also allow me to expose UDF libraries easily in my
configuration, like brickhouse
<https://community.cloudera.com/t5/Advanced-Analytics-Apache-Spark/Leveraging-Brickhouse-in-Spark2-pivot/m-p/59943>,
without users needing to ensure the Jar is loaded and register individual
functions.

Any thoughts on this high-level approach? I know that this ignores things
like creating and storing functions in a FunctionCatalog, and we’d have to
solve challenges with function naming (whether there is a db component).
Right now I’d like to think through the overall idea and not get too
focused on those details.

Thanks,

rb
-- 
Ryan Blue
Software Engineer
Netflix


Re: [DISCUSS] Function plugins

2018-12-18 Thread Ryan Blue
I agree that it probably isn’t feasible to support codegen.

My goal is to be able to have users code like they can in Scala, but change
registration so that they don’t need a SparkSession. This is easy with a
SparkSession:

In [2]: def plus(a: Int, b: Int): Int = a + b
plus: (a: Int, b: Int)Int

In [3]: spark.udf.register("plus", plus _)
Out[3]: UserDefinedFunction(,IntegerType,Some(List(IntegerType,
IntegerType)))

In [4]: %%sql
  : select plus(3,4)

Out[4]:
++
| UDF:plus(3, 4) |
++
| 7  |
++
  available as df0

I want to build a UDFCatalog that can handle indirect registration: a user
registers plus with some class that I control, and that class uses the
UDFCatalog interface to pass those UDFs to Spark. It would also handle the
translation to Spark’s UserDefinedFunction, just like when you use
spark.udf.register.

On Fri, Dec 14, 2018 at 7:02 PM Reynold Xin  wrote:

> I don’t think it is realistic to support codegen for UDFs. It’s hooked
> deep into intervals.
>
> On Fri, Dec 14, 2018 at 6:52 PM Matt Cheah  wrote:
>
>> How would this work with:
>>
>>1. Codegen – how does one generate code given a user’s UDF? Would the
>>user be able to specify the code that is generated that represents their
>>function? In practice that’s pretty hard to get right.
>>2. Row serialization and representation – Will the UDF receive
>>catalyst rows with optimized internal representations, or will Spark have
>>to convert to something more easily consumed by a UDF?
>>
>>
>>
>> Otherwise +1 for trying to get this to work without Hive. I think even
>> having something without codegen and optimized row formats is worthwhile if
>> only because it’s easier to use than Hive UDFs.
>>
>>
>>
>> -Matt Cheah
>>
>>
>>
>> *From: *Reynold Xin 
>> *Date: *Friday, December 14, 2018 at 1:49 PM
>> *To: *"rb...@netflix.com" 
>> *Cc: *Spark Dev List 
>> *Subject: *Re: [DISCUSS] Function plugins
>>
>>
>>
>> [image: Image removed by sender.]
>>
>> Having a way to register UDFs that are not using Hive APIs would be great!
>>
>>
>>
>>
>>
>>
>>
>> On Fri, Dec 14, 2018 at 1:30 PM, Ryan Blue 
>> wrote:
>>
>> Hi everyone,
>> I’ve been looking into improving how users of our Spark platform register
>> and use UDFs and I’d like to discuss a few ideas for making this easier.
>>
>> The motivation for this is the use case of defining a UDF from SparkSQL
>> or PySpark. We want to make it easy to write JVM UDFs and use them from
>> both SQL and Python. Python UDFs work great in most cases, but we
>> occasionally don’t want to pay the cost of shipping data to python and
>> processing it there so we want to make it easy to register UDFs that will
>> run in the JVM.
>>
>> There is already syntax to create a function from a JVM class
>> [docs.databricks.com]
>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__docs.databricks.com_spark_latest_spark-2Dsql_language-2Dmanual_create-2Dfunction.html&d=DwMFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=hzwIMNQ9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs&m=A89zvby1qgVX4Zrstgfnlf1mCBIZUyOhADRR-czy4Fw&s=k_fqMI22guBLW5lj5ZJ21QeKoXoa6LuPP5yA2tlj-TE&e=>
>> in SQL that would work, but this option requires using the Hive UDF API
>> instead of Spark’s simpler Scala API. It also requires argument translation
>> and doesn’t support codegen. Beyond the problem of the API and performance,
>> it is annoying to require registering every function individually with a 
>> CREATE
>> FUNCTION statement.
>>
>> The alternative that I’d like to propose is to add a way to register a
>> named group of functions using the proposed catalog plugin API.
>>
>> For anyone unfamiliar with the proposed catalog plugins, the basic idea
>> is to load and configure plugins using a simple property-based scheme.
>> Those plugins expose functionality through mix-in interfaces, like
>> TableCatalog to create/drop/load/alter tables. Another interface could
>> be UDFCatalog that can load UDFs.
>>
>> interface UDFCatalog extends CatalogPlugin {
>>
>>   UserDefinedFunction loadUDF(String name)
>>
>> }
>>
>> To use this, I would create a UDFCatalog class that returns my Scala
>> functions as UDFs. To look up functions, we would use both the catalog name
>> and the function name.
>>
>> This would allow my users to write Scala UDF instances, package them
>> using a UDFCatalog class (provided

DataSourceV2 sync notes (#4)

2018-12-18 Thread Ryan Blue
Hi everyone, sorry these notes are late. I didn’t have the time to write
this up last week.

For anyone interested in the next sync, we decided to skip next week and
resume in early January. I’ve already sent the invite. As usual, if you
have topics you’d like to discuss or would like to be added to the invite
list, just let me know. Everyone is welcome.

rb

*Attendees*:
Ryan Blue
Xiao Li
Bruce Robbins
John Zhuge
Anton Okolnychyi
Jackey Lee
Jamison Bennett
Srabasti Banerjee
Thomas D’Silva
Wenchen Fan
Matt Cheah
Maryann Xue
(possibly others that entered after the start)

*Agenda*:

   - Current discussions from the v2 batch write PR: WriteBuilder and
   SaveMode
   - Continue sql-api discussion after looking at API dependencies
   - Capabilities API
   - Overview of TableCatalog proposal to sync understanding (if time)

*Notes*:

   - WriteBuilder:
  - Wenchen summarized the options (factory methods vs builder) and
  some trade-offs
  - What we need to accomplish now can be done with factory methods,
  which are simpler
  - A builder matches the structure of the read side
  - Ryan’s opinion is to use the builder for consistency and evolution.
  Builder makes it easier to change or remove parts without copying all of
  the args of a method.
  - Matt’s opinion is that evolution and maintenance is easier and good
  to match the read side
  - *Consensus was to use WriteBuilder instead of factory methods*
   - SaveMode:
  - Context: v1 passes SaveMode from the DataFrameWriter API to
  sources. The action taken for some mode and existing table state
depends on
  the source implementation, which is something the community
wants to fix in
  v2. But, v2 initially passed SaveMode to sources. The question is how and
  when to remove SaveMode.
  - Wenchen: the current API uses SaveMode and we don’t want to drop
  features
  - Ryan: The main requirement is removing this before the next
  release. We should not have a substantial API change without removing it
  because we would still require an API change.
  - Xiao: suggested creating a release-blocking issue.
  - *Consensus was to remove SaveMode before the next release, blocking
  if needed.*
  - Someone also stated that keeping SaveMode would make porting file
  sources to v2 easier
  - Ryan disagrees that using SaveMode makes porting file sources
  faster or easier.
   - Capatbilities API (this is a quick overview of a long conversation)
  - Context: there are several situations where a source needs to
  change how Spark behaves or Spark needs to check whether a
source supports
  some feature. For example, Spark checks whether a source supports batch
  writes, write-only sources that do not need validation need to tell Spark
  not to run validation rules, and sources that can read files with missing
  columns (e.g., Iceberg) need Spark to allow writes that are
missing columns
  if those columns are optional or have default values.
  - Xiao suggested handling this case by case and the conversation
  moved to discussing the motivating case for Netflix: allowing writes that
  do not include optional columns.
  - Wenchen and Maryann added that Spark should handle all default
  values so that this doesn’t differ across sources. Ryan agreed that would
  be good, but pointed out challenges.
  - There was a long discussion about how Spark could handle default
  values. The problem is that adding a column with a default creates a
  problem of reading older data. Maryann and Dilip pointed out that
  traditional databases handle default values at write time so the correct
  default is the default value at write time (instead of read time), but it
  is unclear how existing data is handled.
  - Matt and Ryan asked whether databases update existing rows when a
  default is added. But even if a database can update all existing
rows, that
  would not be reasonable for Spark, which in the worst case would need to
  update millions of immutable files. This is also not a reasonable
  requirement to put on sources, so Spark would need to have read-side
  defaults.
  - Xiao noted that it may be easier to treat internal and external
  sources differently so internal sources to handle defaults. Ryan pointed
  out that this is the motivation for adding a capability API.
  - *Consensus was to start a discuss thread on the dev list about
  default values.*
  - Discussion shifted to a different example: the need to disable
  validation for write-only tables. Consensus was that this use
case is valid.
  - Wenchen: capabilities would work to disable write validation, but
  should not be string based.
  - *Consensus was to use a capabilities API, but use an enum instead
  of strings.*
  - Open question: what other options should use a

[DISCUSS] Default values and data sources

2018-12-18 Thread Ryan Blue
Hi everyone,

This thread is a follow-up to a discussion that we started in the DSv2
community sync last week.

The problem I’m trying to solve is that the format I’m using DSv2 to
integrate supports schema evolution. Specifically, adding a new optional
column so that rows without that column get a default value (null for
Iceberg). The current validation rule for an append in DSv2 fails a write
if it is missing a column, so adding a column to an existing table will
cause currently-scheduled jobs that insert data to start failing. Clearly,
schema evolution shouldn't break existing jobs that produce valid data.

To fix this problem, I suggested option 1: adding a way for Spark to check
whether to fail when an optional column is missing. Other contributors in
the sync thought that Spark should go with option 2: Spark’s schema should
have defaults and Spark should handle filling in defaults the same way
across all sources, like other databases.

I think we agree that option 2 would be ideal. The problem is that it is
very hard to implement.

A source might manage data stored in millions of immutable Parquet files,
so adding a default value isn’t possible. Spark would need to fill in
defaults for files written before the column was added at read time (it
could fill in defaults in new files at write time). Filling in defaults at
read time would require Spark to fill in defaults for only some of the
files in a scan, so Spark would need different handling for each task
depending on the schema of that task. Tasks would also be required to
produce a consistent schema, so a file without the new column couldn’t be
combined into a task with a file that has the new column. This adds quite a
bit of complexity.

Other sources may not need Spark to fill in the default at all. A JDBC
source would be capable of filling in the default values itself, so Spark
would need some way to communicate the default to that source. If the
source had a different policy for default values (write time instead of
read time, for example) then behavior could still be inconsistent.

I think that this complexity probably isn’t worth consistency in default
values across sources, if that is even achievable.

In the sync we thought it was a good idea to send this out to the larger
group to discuss. Please reply with comments!

rb
-- 
Ryan Blue
Software Engineer
Netflix


<    1   2   3   4   5   >